Compare commits

...

102 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
53f675f5cf Merge pull request #727 from pipecat-ai/aleix/pipecat-0.0.49
update CHANGELOG for 0.0.49
2024-11-18 06:27:12 +08:00
Aleix Conchillo Flaqué
8173e4ce55 update CHANGELOG for 0.0.49 2024-11-17 23:26:09 +01:00
Aleix Conchillo Flaqué
5445bb0363 rtvi: add on_bot_started event 2024-11-17 22:40:00 +01:00
Mark Backman
a2a94724e5 Merge pull request #725 from pipecat-ai/mb/fix-simple-chatbot
Fix simple-chatbot example
2024-11-16 12:10:05 -05:00
Aleix Conchillo Flaqué
a8f9b0635a Merge pull request #722 from pipecat-ai/aleix/more-dailin-events
transports(daily): add more dial-in events
2024-11-17 01:09:01 +08:00
Mark Backman
4273a31fd5 Fix simple-chatbot example 2024-11-16 07:48:42 -05:00
Aleix Conchillo Flaqué
67f975a2c8 transports(daily): add more dial-in events 2024-11-16 01:22:50 +01:00
Mark Backman
d0bca67666 Merge pull request #716 from pipecat-ai/mb/mute-stt-service
Add STTMuteFilter to un/mute the STT
2024-11-14 19:55:00 -05:00
Mark Backman
966974bfc6 Change STTMuteProcessor to STTMuteFilter 2024-11-14 19:47:37 -05:00
Mark Backman
f807f233bd Suppress UserStartedSpeakingFrame and UserStoppedSpeakingFrame when muted 2024-11-14 17:11:51 -05:00
Mark Backman
33108f5798 Code review feedback 2024-11-14 17:05:08 -05:00
Mark Backman
52de825af8 Update CHANGELOG 2024-11-14 13:47:08 -05:00
Mark Backman
5fe679039c Add STTMuteProcessor to un/mute the STT 2024-11-14 13:35:02 -05:00
Kwindla Hultman Kramer
534f710f5d Merge pull request #688 from pipecat-ai/khk/natural-conversation
More work on llm-as-judge phrase endpointing
2024-11-14 09:15:16 -08:00
Mark Backman
53a11744a8 Merge pull request #712 from pipecat-ai/aleix/some-languages-tweaks
some languages tweaks
2024-11-14 09:33:26 -05:00
Mark Backman
72412cc0c4 Code review feedback 2024-11-14 09:31:04 -05:00
Mark Backman
b77ac07bc6 Merge pull request #715 from pipecat-ai/mb/update-readme-2
Add visual divider below Pipecat README image
2024-11-14 08:54:25 -05:00
Mark Backman
eb6926e0ce Add visual divider below Pipecat README image 2024-11-14 08:51:07 -05:00
Mark Backman
3b2c9de944 Merge pull request #713 from pipecat-ai/mb/update-readme
Update README
2024-11-14 08:45:28 -05:00
Mark Backman
27ff868e5a Move CONTRIBUTING to top directory 2024-11-14 08:43:03 -05:00
Mark Backman
57ef525a8e Update README 2024-11-14 08:43:03 -05:00
Aleix Conchillo Flaqué
d1db54d5fe examples(playht): use a 2.0 engine 2024-11-13 17:19:23 +01:00
Aleix Conchillo Flaqué
4f88fc0eb8 services(tts): initialize language to the proper language code 2024-11-13 17:19:23 +01:00
Aleix Conchillo Flaqué
37d1f4c4e1 services(tts): some language to service language cleanup 2024-11-13 17:19:23 +01:00
Aleix Conchillo Flaqué
ef9e86d997 services(playht): make sure we only skip wav header no matter the size 2024-11-13 17:19:23 +01:00
Aleix Conchillo Flaqué
2d2ef5a417 services(playht): voice engine is Play3.0-mini 2024-11-13 17:19:23 +01:00
Aleix Conchillo Flaqué
c1fff00586 services(playht): fix language codes 2024-11-13 17:19:23 +01:00
Mark Backman
0af2196f50 Merge pull request #708 from pipecat-ai/mb/add-rime-ai
Add RimeTTSService
2024-11-12 18:29:53 -05:00
Mark Backman
cd42320788 Update changelog 2024-11-12 18:28:04 -05:00
Mark Backman
70fce52499 Merge pull request #710 from pipecat-ai/mb/update-readme-krisp
Update Krisp README instructions
2024-11-12 11:15:25 -05:00
Mark Backman
70b60c0593 Update Krisp README instructions 2024-11-12 10:26:12 -05:00
Jon Taylor
2d8aa03f31 Merge pull request #706 from pipecat-ai/jpt/modal-example
barebones modal.com deployment example
2024-11-12 11:41:00 +00:00
Kwindla Hultman Kramer
581ff26704 Merge pull request #707 from pipecat-ai/khk/clean-up
tiny PR to remove old comment lines
2024-11-11 21:14:16 -08:00
Kwindla Hultman Kramer
335178ff06 some gemini audio input examples 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
ee53535f41 gemini audio-in with no transcription 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
91ac40307e small fix and more prompt examples 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
b6c2c1f730 anthropic natural conversation example using claude haiku 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
b56c789ae4 fixes for proposed judge pipeline 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
bd435d9e62 missing commit 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
55a81df84f contributing to llm-as-judge phrase endpointing work 2024-11-11 21:04:50 -08:00
Kwindla Hultman Kramer
87434460f5 temp hacking 2024-11-11 21:04:50 -08:00
Mark Backman
958ec42e8d Add Rime.ai TTS service 2024-11-11 21:58:09 -05:00
Jon Taylor
d1fff60d1d barebones modal.com deployment example 2024-11-11 22:30:07 +00:00
Kwindla Hultman Kramer
1438e5654a remove old comment 2024-11-10 16:08:10 -08:00
Aleix Conchillo Flaqué
1d4be0139a Merge pull request #705 from pipecat-ai/aleix/prepare-0.0.48
update CHANGELOG for 0.0.48
2024-11-10 14:08:33 -08:00
Aleix Conchillo Flaqué
f58c3ee322 update CHANGELOG for 0.0.48 2024-11-10 23:01:03 +01:00
Aleix Conchillo Flaqué
379750df91 Merge pull request #704 from pipecat-ai/aleix/cartesia-tts-stopped-frame
services(cartesia): generated TTSStoppedFrame after no more audio
2024-11-10 05:17:36 -08:00
Aleix Conchillo Flaqué
d125a38737 services(cartesia): generated TTSStoppedFrame after no more audio
The TTSStoppedFrame should be generated when the TTS services stoped generating
audio not when the bot stops speaking.
2024-11-10 09:55:45 +01:00
Mark Backman
446bb0aeaf Merge pull request #702 from pipecat-ai/mb/azure-websocket
Add an Azure TTS websocket service
2024-11-09 17:41:53 -05:00
Aleix Conchillo Flaqué
d839080834 Merge pull request #642 from pipecat-ai/aleix/input-queues-block-frames
introduce frame processor input queues block frames
2024-11-09 14:30:17 -08:00
Mark Backman
9b85d0642b Add a changelog entry 2024-11-09 12:37:29 -05:00
Mark Backman
230b51a117 Add an Azure TTS websocket service 2024-11-09 12:37:29 -05:00
Mark Backman
3a965ca396 Merge pull request #701 from pipecat-ai/khk/anthropic-function-calling-fix
fixes for anthropic function calling
2024-11-09 06:39:34 -05:00
Kwindla Hultman Kramer
33fc5bf990 improved 20c-persistent-context-anthropic.py 2024-11-08 16:42:30 -08:00
Kwindla Hultman Kramer
a54ca08405 fixes for anthropic function calling 2024-11-08 16:33:02 -08:00
Filipi da Silva Fuchter
4379db43ed Merge pull request #689 from pipecat-ai/filipi/krisp
Making pipecat work with Krisp
2024-11-08 16:22:52 -03:00
Filipi Fuchter
e915c676aa Added support for Krisp audio filter 2024-11-08 16:18:10 -03:00
Mark Backman
e0a003afa1 Merge pull request #695 from pipecat-ai/mb/initialize-azure-lang
Initialize the speech_recognition_language for Azure TTS
2024-11-08 06:40:40 -05:00
James Hush
d5666727ce feat: toggle looping with soundfile mixer (#693)
* feat: toggle looping with soundfile mixer

* Implement PR changes
2024-11-07 21:08:37 -08:00
Mark Backman
f6d7402530 Update changelog 2024-11-07 15:16:03 -05:00
Mark Backman
aefe190c9f Initialize the speech_recognition_language for Azure TTS 2024-11-07 15:14:05 -05:00
Vanessa Pyne
29925a8f21 Merge pull request #551 from Allenmylath/patch-3
Frame types and short descriptionCreate Frames.md
2024-11-07 10:05:32 -06:00
Aleix Conchillo Flaqué
beb3271168 services(tts): make sure word timestamp is reset properly 2024-11-06 18:54:12 -08:00
Aleix Conchillo Flaqué
b959ac6e1e Merge pull request #694 from pipecat-ai/aleix/daily-add-on-transcription-message
transports(daily): call on_transcription_message event handler
2024-11-06 15:21:17 -08:00
Aleix Conchillo Flaqué
17f4286942 transports(daily): call on_transcription_message event handler 2024-11-06 15:10:58 -08:00
Aleix Conchillo Flaqué
ce89bbb16e tts(elevenlabs): support pausing and resuming frames while speaking 2024-11-06 14:38:33 -08:00
Aleix Conchillo Flaqué
865768039b processors: remove block_on_frames and add pause_processing_frames() instead 2024-11-06 14:20:25 -08:00
Aleix Conchillo Flaqué
7071482583 try to use queue_frame() instead of process_frame() 2024-11-06 14:18:21 -08:00
Aleix Conchillo Flaqué
5353d13151 update CHANGELOG 2024-11-06 13:16:58 -08:00
Aleix Conchillo Flaqué
a9e565f355 processors: fix input queue interruptions 2024-11-06 13:12:24 -08:00
Aleix Conchillo Flaqué
b6f0c16591 examples: restore EndFrame() on 01 and 02 foundational 2024-11-06 13:05:03 -08:00
Aleix Conchillo Flaqué
49005d02f5 services(tts): use TTSSpeakFrame in say() method 2024-11-06 13:05:03 -08:00
Aleix Conchillo Flaqué
6d8b885071 transports(base_output): push bot started/stopped frames downstream 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
2eccb33e73 processors: allow passing a callback when queued frame is processed 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
22ca4c5a02 processors: cancel input task and empty queue with interruptions 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
84f26ac1ca processors: introduce input queues
Frame processors can now decide if they should continue processing frames or
not, and if so also decide when to continue processing frames. For example,
asynchronous TTS services will stop processing frames until they have generated
all the audio for an LLM response.
2024-11-06 12:13:49 -08:00
Aleix Conchillo Flaqué
74937411e6 Merge pull request #691 from pipecat-ai/aleix/rtvi-manual-bot-ready
rtvi: bot-ready message needs to be sent manual
2024-11-06 10:53:25 -08:00
Aleix Conchillo Flaqué
8aab068ffd rtvi: bot-ready message needs to be sent manual 2024-11-05 10:52:54 -08:00
Aleix Conchillo Flaqué
bd50201ce4 transports(daily): just make it clear we subscribe to camera 2024-11-04 17:32:46 -08:00
Aleix Conchillo Flaqué
6082da284e Merge pull request #611 from pipecat-ai/aleix/audio-filters
introduce audio filters
2024-11-04 16:34:47 -08:00
Aleix Conchillo Flaqué
358c458265 transports(base_input): handle filter contorl frames 2024-11-04 16:19:52 -08:00
Aleix Conchillo Flaqué
807dbbe326 audio(noisereduce): allow enabling/disabling filter 2024-11-04 16:13:29 -08:00
Aleix Conchillo Flaqué
3c116b291d audio(mixers): some cosmetics 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
0dd413ee90 audio(filters): add noisereduce filter 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
abc8ede3d7 introduce audio filters 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
126324ca1b Merge pull request #687 from pipecat-ai/aleix/transport-audio-mixers
introduce transport audio mixers
2024-11-04 13:14:36 -08:00
Aleix Conchillo Flaqué
602915ae18 examples(websocket-server): allow interruptions 2024-11-04 13:05:02 -08:00
Aleix Conchillo Flaqué
0ac9e2dd3f transports(network): synchronize with time before sending data 2024-11-04 13:04:18 -08:00
Aleix Conchillo Flaqué
a9ef5ca95d examples: add bot background sound example 2024-11-03 11:13:02 -08:00
Aleix Conchillo Flaqué
81c476dd4c introduce output transport audio mixers 2024-11-03 11:13:02 -08:00
Kwindla Hultman Kramer
151242d3a0 Merge pull request #666 from pipecat-ai/khk/realtime-pipecat-vad
Support using Pipecat turn detection instead of OpenAI Realtime API turn detection
2024-11-02 08:36:31 -07:00
Kwindla Hultman Kramer
93c6e5098c added comment explaining config of TurnDetection 2024-11-02 08:24:54 -07:00
Aleix Conchillo Flaqué
4455b2a428 rtvi: create queues before tasks 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
94062592ef base_output: generate smaller audio frames of the same incoming type 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
d2401a76c8 base_output: only generate bot speaking with TTS audio frames 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
e2b1b56e86 examples: don't require room token if using an STT 2024-11-01 23:06:50 -07:00
Mark Backman
84bd767312 Merge pull request #685 from pipecat-ai/mb/add-recording-events
Add recording events and callbacks
2024-11-01 12:02:46 -04:00
Mark Backman
802c29e9e1 Add recording events and callbacks 2024-11-01 10:20:00 -04:00
Kwindla Hultman Kramer
e1a7edfb58 make it possible to use Pipecat turn detection instead of OpenAI turn detection 2024-10-25 15:59:48 -05:00
allenmylath
0e69625a01 Rename frames.md to frame.md
edited again to frame.md
2024-10-14 10:07:47 +05:30
allenmylath
4e0823fced Rename Frames.md to frames.md
file name changed as requested
2024-10-14 10:05:26 +05:30
Allenmylath
40af3571f0 Create Frames.md
Made asmall explanation for diffrent types of frames in pipcat
2024-10-05 22:04:03 +05:30
69 changed files with 4080 additions and 859 deletions

View File

@@ -5,10 +5,52 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.49] - 2024-11-17
### Added
- Added RTVI `on_bot_started` event which is useful in a single turn
interaction.
- Added `DailyTransport` events `dialin-connected`, `dialin-stopped`,
`dialin-error` and `dialin-warning`. Needs daily-python >= 0.13.0.
- Added `RimeHttpTTSService` and the `07q-interruptible-rime.py` foundational
example.
- Added `STTMuteFilter`, a general-purpose processor that combines STT
muting and interruption control. When active, it prevents both transcription
and interruptions during bot speech. The processor supports multiple
strategies: `FIRST_SPEECH` (mute only during bot's first
speech), `ALWAYS` (mute during all bot speech), or `CUSTOM` (using provided
callback).
- Added `STTMuteFrame`, a control frame that enables/disables speech
transcription in STT services.
## [0.0.48] - 2024-11-10 "Antonio release"
### Added
- There's now an input queue in each frame processor. When you call
`FrameProcessor.push_frame()` this will internally call
`FrameProcessor.queue_frame()` on the next processor (upstream or downstream)
and the frame will be internally queued (except system frames). Then, the
queued frames will get processed. With this input queue it is also possible
for FrameProcessors to block processing more frames by calling
`FrameProcessor.pause_processing_frames()`. The way to resume processing
frames is by calling `FrameProcessor.resume_processing_frames()`.
- Added audio filter `NoisereduceFilter`.
- Introduce input transport audio filters (`BaseAudioFilter`). Audio filters can
be used to remove background noises before audio is sent to VAD.
- Introduce output transport audio mixers (`BaseAudioMixer`). Output transport
audio mixers can be used, for example, to add background sounds or any other
audio mixing functionality before the output audio is actually written to the
transport.
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
received OpenAI LLM context frame and it doesn't let it through until the
notifier is notified.
@@ -31,6 +73,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
grained control of what media subscriptions you want for each participant in a
room.
- Added audio filter `KrispFilter`.
### Changed
- The following `DailyTransport` functions are now `async` which means they need
@@ -42,8 +86,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
output to 24000 and also the default output transport sample rate. This
improves audio quality at the cost of some extra bandwidth.
- `AzureTTSService` now uses Azure websockets instead of HTTP requests.
- The previous `AzureTTSService` HTTP implementation is now
`AzureHttpTTSService`.
### Fixed
- Websocket transports (FastAPI and Websocket) now synchronize with time before
sending data. This allows for interruptions to just work out of the box.
- Improved bot speaking detection for all TTS services by using actual bot
audio.
@@ -55,9 +107,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed an issue with PlayHTTTSService, where the TTFB metrics were reporting
very small time values.
- Fixed an issue where AzureTTSService wasn't initializing the specified
language.
### Other
- Added a new foundational example 22-natural-conversation.py. This examples
- Add `23-bot-background-sound.py` foundational example.
- Added a new foundational example `22-natural-conversation.py`. This example
shows how to achieve a more natural conversation detecting when the user ends
statement.

View File

@@ -1,14 +1,20 @@
<div align="center">
<h1><div align="center">
 <img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
</div>
# Pipecat
</div></h1>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
`pipecat` is a framework for building voice (and multimodal) conversational agents. Things like personal coaches, meeting assistants, [story-telling toys for kids](https://storytelling-chatbot.fly.dev/), customer support bots, [intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0), and snarky social companions.
Pipecat is an open source Python framework for building voice and multimodal conversational agents. It handles the complex orchestration of AI services, network transport, audio processing, and multimodal interactions, letting you focus on creating engaging experiences.
Take a look at some example apps:
## What you can build
- **Voice Assistants**: [Natural, real-time conversations with AI](https://demo.dailybots.ai/)
- **Interactive Agents**: Personal coaches and meeting assistants
- **Multimodal Apps**: Combine voice, video, images, and text
- **Creative Tools**: [Story-telling experiences](https://storytelling-chatbot.fly.dev/) and social companions
- **Business Solutions**: [Customer intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0) and support bots
## See it in action
<p float="left">
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/simple-chatbot/image.png" width="280" /></a>&nbsp;
@@ -18,33 +24,52 @@ Take a look at some example apps:
<a href="https://github.com/pipecat-ai/pipecat/tree/main/examples/moondream-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/examples/moondream-chatbot/image.png" width="280" /></a>
</p>
## Getting started with voice agents
## Key features
- **Voice-first Design**: Built-in speech recognition, TTS, and conversation handling
- **Flexible Integration**: Works with popular AI services (OpenAI, ElevenLabs, etc.)
- **Pipeline Architecture**: Build complex apps from simple, reusable components
- **Real-time Processing**: Frame-based pipeline architecture for fluid interactions
- **Production Ready**: Enterprise-grade WebRTC and Websocket support
## Getting started
You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when youre ready. You can also add a 📞 telephone number, 🖼️ image output, 📺 video input, use different LLMs, and more.
```shell
# install the module
# Install the module
pip install pipecat-ai
# set up an .env file with API keys
# Set up your environment
cp dot-env.template .env
```
By default, in order to minimize dependencies, only the basic framework functionality is available. Some third-party AI services require additional dependencies that you can install with:
To keep things lightweight, only the core framework is included by default. If you need support for third-party AI services, you can add the necessary dependencies with:
```shell
pip install "pipecat-ai[option,...]"
```
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
Available options include:
- **AI services**: `anthropic`, `assemblyai`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
| Category | Services | Install Command Example |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/api-reference/services/stt/azure), [Deepgram](https://docs.pipecat.ai/api-reference/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/api-reference/services/stt/gladia), [Whisper](https://docs.pipecat.ai/api-reference/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/services/llm/anthropic), [Azure](https://docs.pipecat.ai/api-reference/services/llm/azure), [Fireworks AI](https://docs.pipecat.ai/api-reference/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/services/llm/gemini), [Ollama](https://docs.pipecat.ai/api-reference/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/services/llm/openai), [Together AI](https://docs.pipecat.ai/api-reference/services/llm/together) | `pip install "pipecat-ai[openai]"` |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/api-reference/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/services/tts/azure), [Cartesia](https://docs.pipecat.ai/api-reference/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/services/tts/elevenlabs), [Google](https://docs.pipecat.ai/api-reference/services/tts/google), [LMNT](https://docs.pipecat.ai/api-reference/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/api-reference/services/tts/openai), [PlayHT](https://docs.pipecat.ai/api-reference/services/tts/playht), [Rime](https://docs.pipecat.ai/api-reference/services/tts/rime), [XTTS](https://docs.pipecat.ai/api-reference/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` |
| Speech-to-Speech | [OpenAI Realtime](https://docs.pipecat.ai/api-reference/services/s2s/openai) | `pip install "pipecat-ai[openai]"` |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/services/transport/daily), WebSocket, Local | `pip install "pipecat-ai[daily]"` |
| Video | [Tavus](https://docs.pipecat.ai/api-reference/services/video/tavus) | `pip install "pipecat-ai[tavus]"` |
| Vision & Image | [Moondream](https://docs.pipecat.ai/api-reference/services/vision/moondream), [fal](https://docs.pipecat.ai/api-reference/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/api-reference/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/api-reference/utilities/audio/krisp-filter), [Noisereduce](https://docs.pipecat.ai/api-reference/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` |
| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/api-reference/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/api-reference/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` |
📚 [View full services documentation →](https://docs.pipecat.ai/api-reference/services/supported-services)
## Code examples
- [foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) — small snippets that build on each other, introducing one or two concepts at a time
- [example apps](https://github.com/pipecat-ai/pipecat/tree/main/examples/) — complete applications that you can use as starting points for development
- [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) — small snippets that build on each other, introducing one or two concepts at a time
- [Example apps](https://github.com/pipecat-ai/pipecat/tree/main/examples/) — complete applications that you can use as starting points for development
## A simple voice agent running locally
@@ -109,7 +134,7 @@ Run it with:
python app.py
```
Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
Daily provides a prebuilt WebRTC user interface. While the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
@@ -119,16 +144,6 @@ One way to get up and running quickly with WebRTC is to sign up for a Daily deve
Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://docs.daily.co/reference/rest-api/rooms) in the developer Dashboard.
## What is VAD?
Voice Activity Detection &mdash; very important for knowing when a user has finished speaking to your bot. If you are not using press-to-talk, and want Pipecat to detect when the user has finished talking, VAD is an essential component for a natural feeling conversation.
Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
```shell
pip install pipecat-ai[silero]
```
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -206,8 +221,23 @@ Install the
}
```
## Contributing
We welcome contributions from the community! Whether you're fixing bugs, improving documentation, or adding new features, here's how you can help:
- **Found a bug?** Open an [issue](https://github.com/pipecat-ai/pipecat/issues)
- **Have a feature idea?** Start a [discussion](https://discord.gg/pipecat)
- **Want to contribute code?** Check our [CONTRIBUTING.md](CONTRIBUTING.md) guide
- **Documentation improvements?** [Docs](https://github.com/pipecat-ai/docs) PRs are always welcome
Before submitting a pull request, please check existing issues and PRs to avoid duplicates.
We aim to review all contributions promptly and provide constructive feedback to help get your changes merged.
## Getting help
➡️ [Join our Discord](https://discord.gg/pipecat)
➡️ [Read the docs](https://docs.pipecat.ai)
➡️ [Reach us on X](https://x.com/pipecat_ai)

113
docs/frame.md Normal file
View File

@@ -0,0 +1,113 @@
# Understanding Different Frame Types in the Pipecat System
In the Pipecat system, frames are used to represent different types of data and control signals that flow through the pipeline. Understanding these frame types is crucial for working with the system effectively. This tutorial will cover the main categories of frames and their specific uses.
## 1. Base Frame Classes
### Frame
The `Frame` class is the base class for all frames. It includes:
- `id`: A unique identifier
- `name`: A descriptive name
- `pts`: Presentation timestamp (optional)
### DataFrame
`DataFrame` is a subclass of `Frame` and serves as a base for most data-carrying frames.
## 2. Audio Frames
### AudioRawFrame
Represents a chunk of audio with properties:
- `audio`: Raw audio data
- `sample_rate`: Audio sample rate
- `num_channels`: Number of audio channels
Subclasses include:
- `InputAudioRawFrame`: For audio from input sources
- `OutputAudioRawFrame`: For audio to be played by output devices
- `TTSAudioRawFrame`: For audio generated by Text-to-Speech services
## 3. Image Frames
### ImageRawFrame
Represents an image with properties:
- `image`: Raw image data
- `size`: Image dimensions
- `format`: Image format (e.g., JPEG, PNG)
Subclasses include:
- `InputImageRawFrame`: For images from input sources
- `OutputImageRawFrame`: For images to be displayed
- `UserImageRawFrame`: For images associated with a specific user
- `VisionImageRawFrame`: For images with associated text for description
- `URLImageRawFrame`: For images with an associated URL
### SpriteFrame
Represents an animated sprite, containing a list of `ImageRawFrame` objects.
## 4. Text and Transcription Frames
### TextFrame
Represents a chunk of text, used for various purposes in the pipeline.
### TranscriptionFrame
A specialized `TextFrame` for speech transcriptions, including:
- `user_id`: ID of the speaking user
- `timestamp`: When the transcription was generated
- `language`: Detected language of the speech
### InterimTranscriptionFrame
Similar to `TranscriptionFrame`, but for interim (not final) transcriptions.
## 5. LLM (Language Model) Frames
### LLMMessagesFrame
Contains a list of messages for an LLM service to process.
### LLMMessagesAppendFrame and LLMMessagesUpdateFrame
Used to modify the current context of LLM messages.
### LLMSetToolsFrame
Specifies tools (functions) available for the LLM to use.
### LLMEnablePromptCachingFrame
Controls prompt caching in certain LLMs.
## 6. System and Control Frames
### SystemFrame
Base class for system-level frames.
Important system frames include:
- `StartFrame`: Initiates a pipeline
- `CancelFrame`: Stops a pipeline immediately
- `ErrorFrame`: Notifies of errors (with `FatalErrorFrame` for unrecoverable errors)
- `EndTaskFrame` and `CancelTaskFrame`: Control pipeline tasks
- `StartInterruptionFrame` and `StopInterruptionFrame`: Indicate user speech for interruptions
### ControlFrame
Base class for control-flow frames.
Notable control frames:
- `EndFrame`: Signals the end of a pipeline
- `LLMFullResponseStartFrame` and `LLMFullResponseEndFrame`: Bracket LLM responses
- `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame`: Indicate user speech activity
- `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame`: Indicate bot speech activity
- `TTSStartedFrame` and `TTSStoppedFrame`: Bracket Text-to-Speech responses
## 7. Special Purpose Frames
### AppFrame
Base class for application-specific custom frames.
### MetricsFrame
Contains performance metrics data.
### FunctionCallInProgressFrame and FunctionCallResultFrame
Used for handling LLM function (tool) calls.
### ServiceUpdateSettingsFrame
Base class for updating service settings, with specific subclasses for LLM, TTS, and STT services.
## Conclusion
Understanding these frame types is essential for working with the Pipecat system. Each frame type serves a specific purpose in the pipeline, whether it's carrying data (like audio or images), controlling the flow of the pipeline, or managing system-level operations. By using the appropriate frame types, you can effectively process and transmit various kinds of information through your pipeline.

View File

@@ -52,4 +52,7 @@ OPENPIPE_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
TAVUS_PERSONA_ID=...
TAVUS_PERSONA_ID=...
#Krisp
KRISP_MODEL_PATH=...

View File

@@ -0,0 +1,91 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
dist/
*.egg-info/
*.egg
.installed.cfg
.eggs/
downloads/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
MANIFEST
# Virtual Environments
venv/
env/
.env
.venv/
ENV/
env.bak/
venv.bak/
# IDE
.idea/
.vscode/
.spyderproject
.spyproject
.ropeproject
# Testing and Coverage
.coverage
.coverage.*
htmlcov/
.pytest_cache/
.tox/
.nox/
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
cover/
# Logs and Databases
*.log
*.db
db.sqlite3
db.sqlite3-journal
pip-log.txt
# System Files
.DS_Store
Thumbs.db
desktop.ini
*.swp
*.swo
*.bak
*.tmp
*~
# Build and Documentation
docs/_build/
.pybuilder/
target/
instance/
.webassets-cache
.pdm.toml
.pdm-python
.pdm-build/
__pypackages__/
# Other
*.mo
*.pot
*.sage.py
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
.pytype/
cython_debug/
.ipynb_checkpoints

View File

@@ -0,0 +1,37 @@
# Deploying Pipecat to Modal.com
Barebones deployment example for [modal.com](https://www.modal.com)
1. Install dependencies
```bash
python -m venv venv
source venv/bin/active # or OS equivalent
pip install -r requirements.txt
```
2. Setup .env
```bash
cp env.example .env
```
Alternatively, you can configure your Modal app to use [secrets](https://modal.com/docs/guide/secrets)
3. Test the app locally
```bash
modal serve app.py
```
4. Deploy to production
```bash
modal deploy app.py
```
## Configuration options
This app sets some sensible defaults for reducing cold starts, such as `minkeep_warm=1`, which will keep at least 1 warm instance ready for your bot function.
It has been configured to only allow a concurrency of 1 (`max_inputs=1`) as each user will require their own running function.

View File

@@ -0,0 +1,75 @@
import os
import aiohttp
import modal
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from loguru import logger
from bot import _voice_bot_process
MAX_SESSION_TIME = 15 * 60 # 15 minutes
app = modal.App("pipecat-modal")
image = modal.Image.debian_slim(python_version="3.12").pip_install_from_requirements(
"requirements.txt"
)
@app.function(
image=image,
cpu=1.0,
secrets=[modal.Secret.from_dotenv()],
keep_warm=1,
enable_memory_snapshot=True,
max_inputs=1, # Do not reuse instances across requests
retries=0,
)
def launch_bot_process(room_url: str, token: str):
_voice_bot_process(room_url, token)
@app.function(
image=image,
secrets=[modal.Secret.from_dotenv()],
)
@modal.web_endpoint(method="POST")
async def start():
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomParams,
)
logger.info("Request received")
async with aiohttp.ClientSession() as session:
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
# Create new Daily room
room = await daily_rest_helper.create_room(DailyRoomParams())
if not room.url:
raise HTTPException(
status_code=500,
detail="Unable to create room",
)
logger.info(f"Created room: {room.url}")
# Create bot token for room
token = await daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
logger.info(f"Bot token created: {token}")
# Spawn a new bot process
launch_bot_process.spawn(room_url=room.url, token=token)
# Return room URL to the user to join
# Note: in production, you would want to return a token to the user
return JSONResponse(content={"room_url": room.url, token: token})

View File

@@ -0,0 +1,90 @@
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token: str):
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
transport = DailyTransport(
room_url,
token,
"bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY", ""), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
def _voice_bot_process(room_url: str, token: str):
asyncio.run(main(room_url, token))

View File

@@ -0,0 +1,3 @@
DAILY_API_KEY=
OPENAI_API_KEY=
CARTESIA_API_KEY=

View File

@@ -0,0 +1,5 @@
python-dotenv==1.0.1
modal==0.65.48
pipecat-ai[daily,silero,cartesia,openai]==0.0.48
fastapi==0.115.4
aiohttp==3.10.10

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -36,7 +36,7 @@ async def main():
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -50,12 +50,9 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.queue_frames(
[TTSSpeakFrame(f"Hello there, {participant_name}!"), EndFrame()]
)
await runner.run(task)

View File

@@ -9,7 +9,7 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -28,25 +28,24 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
pipeline = Pipeline([tts, transport.output()])
pipeline = Pipeline([tts, transport.output()])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
async def say_something():
await asyncio.sleep(1)
await task.queue_frame(TextFrame("Hello there!"))
async def say_something():
await asyncio.sleep(1)
await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()])
runner = PipelineRunner()
runner = PipelineRunner()
await asyncio.gather(runner.run(task), say_something())
await asyncio.gather(runner.run(task), say_something())
if __name__ == "__main__":

View File

@@ -13,7 +13,7 @@ from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -37,7 +37,7 @@ async def main():
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -57,11 +57,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await runner.run(task)

View File

@@ -31,11 +31,11 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,

View File

@@ -49,7 +49,7 @@ async def main():
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
voice_url="s3://voice-cloning-zero-shot/d9ff78ba-d016-47f6-b0ef-dd630f59414e/female-cs/manifest.json",
params=PlayHTTTSService.InputParams(language=Language.EN),
)

View File

@@ -32,11 +32,11 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,

View File

@@ -32,11 +32,11 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,

View File

@@ -0,0 +1,274 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import google.ai.generativelanguage as glm
from dataclasses import dataclass
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
InputAudioRawFrame,
Frame,
StartInterruptionFrame,
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
marker = "|----|"
system_message = f"""
You are a helpful LLM in a WebRTC call. Your goals are to be helpful and brief in your responses.
You are expert at transcribing audio to text. You will receive a mixture of audio and text input. When
asked to transcribe what the user said, output an exact, word-for-word transcription.
Your output will be converted to audio so don't include special characters in your answers.
Each time you answer, you should respond in three parts.
1. Transcribe exactly what the user said.
2. Output the separator field '{marker}'.
3. Respond to the user's input in a helpful, creative way using only simple text and punctuation.
Example:
User: How many ounces are in a pound?
You: How many ounces are in a pound?
{marker}
There are 16 ounces in a pound.
"""
@dataclass
class MagicDemoTranscriptionFrame(Frame):
text: str
class UserAudioCollector(FrameProcessor):
def __init__(self, context, user_context_aggregator):
super().__init__()
self._context = context
self._user_context_aggregator = user_context_aggregator
self._audio_frames = []
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
return
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
await self._user_context_aggregator.push_frame(
self._user_context_aggregator.get_context_frame()
)
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
else:
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary. Assume all audio frames have the same duration.
self._audio_frames.append(frame)
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
buffer_duration = frame_duration * len(self._audio_frames)
while buffer_duration > self._start_secs:
self._audio_frames.pop(0)
buffer_duration -= frame_duration
await self.push_frame(frame, direction)
class TranscriptExtractor(FrameProcessor):
def __init__(self, context):
super().__init__()
self._context = context
self._accumulator = ""
self._processing_llm_response = False
self._accumulating_transcript = False
def reset(self):
self._accumulator = ""
self._processing_llm_response = False
self._accumulating_transcript = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseStartFrame):
self._processing_llm_response = True
self._accumulating_transcript = True
elif isinstance(frame, TextFrame) and self._processing_llm_response:
if self._accumulating_transcript:
text = frame.text
split_index = text.find(marker)
if split_index < 0:
self._accumulator += frame.text
# do not push this frame
return
else:
self._accumulating_transcript = False
self._accumulator += text[:split_index]
frame.text = text[split_index + len(marker) :]
await self.push_frame(frame)
return
elif isinstance(frame, LLMFullResponseEndFrame):
await self.push_frame(MagicDemoTranscriptionFrame(text=self._accumulator.strip()))
self.reset()
await self.push_frame(frame, direction)
class TanscriptionContextFixup(FrameProcessor):
def __init__(self, context):
super().__init__()
self._context = context
self._transcript = "THIS IS A TRANSCRIPT"
def swap_user_audio(self):
if not self._transcript:
return
message = self._context.messages[-2]
last_part = message.parts[-1]
if (
message.role == "user"
and last_part.inline_data
and last_part.inline_data.mime_type == "audio/wav"
):
self._context.messages[-2] = glm.Content(
role="user", parts=[glm.Part(text=self._transcript)]
)
def add_transcript_back_to_inference_output(self):
if not self._transcript:
return
message = self._context.messages[-1]
last_part = message.parts[-1]
if message.role == "model" and last_part.text:
self._context.messages[-1].parts[-1].text += f"\n\n{marker}\n{self._transcript}\n"
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, MagicDemoTranscriptionFrame):
self._transcript = frame.text
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(
frame, StartInterruptionFrame
):
self.swap_user_audio()
self.add_transcript_back_to_inference_output()
self._transcript = ""
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
# No transcription at all. just audio input to Gemini!
# transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
messages = [
{
"role": "system",
"content": system_message,
},
{
"role": "user",
"content": "Start by saying hello.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())
pull_transcript_out_of_llm_output = TranscriptExtractor(context)
fixup_context_messages = TanscriptionContextFixup(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
audio_collector,
context_aggregator.user(), # User responses
llm, # LLM
pull_transcript_out_of_llm_output,
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
fixup_context_messages,
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.audio.filters.krisp_filter import KrispFilter
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_filter=KrispFilter(),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,100 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.rime import RimeHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
params=RimeHttpTTSService.InputParams(reduce_latency=True),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -67,7 +67,8 @@ async def main():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
# model="claude-3-5-sonnet-20240620",
model="claude-3-5-sonnet-latest",
enable_prompt_caching_beta=True,
)
llm.register_function("get_weather", get_weather)

View File

@@ -98,12 +98,13 @@ async def load_conversation(function_name, tool_call_id, args, llm, context, res
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a succinct, creative and helpful way. Prefer responses that are one sentence long unless you are asked for a longer or more detailed response.",
},
{"role": "user", "content": ""},
{"role": "assistant", "content": []},
{"role": "user", "content": "Tell me"},
{"role": "user", "content": "a joke"},
{"role": "user", "content": "Start the call by saying the word 'hello'. Say only that word."},
# {"role": "user", "content": ""},
# {"role": "assistant", "content": []},
# {"role": "user", "content": "Tell me"},
# {"role": "user", "content": "a joke"},
]
tools = [
{
@@ -183,7 +184,7 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest"
)
# you can either register a single function for all function calls, or specific functions

View File

@@ -0,0 +1,339 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import time
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.user_idle_processor import UserIdleProcessor
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
classifier_statement = "Determine if the user's statement ends with a complete thought and you should respond. The user text is transcribed speech. It may contain multiple fragments concatentated together. You are trying to determine only the completeness of the last user statement. The previous assistant statement is provided only for context. Categorize the text as either complete with the user now expecting a response, or incomplete. Return 'YES' if text is likely complete and the user is expecting a response. Return 'NO' if the text seems to be a partial expression or unfinished thought."
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# concatenated.
if isinstance(frame, OpenAILLMContextFrame):
logger.debug(f"Context Frame: {frame}")
# Take text content from the most recent user messages.
messages = frame.context.messages
user_text_messages = []
last_assistant_message = None
for message in reversed(messages):
if message["role"] != "user":
if message["role"] == "assistant":
last_assistant_message = message
break
if isinstance(message["content"], str):
user_text_messages.append(message["content"])
elif isinstance(message["content"], list):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
if user_text_messages:
logger.debug(f"User text messages: {user_text_messages}")
user_message = " ".join(reversed(user_text_messages))
logger.debug(f"User message: {user_message}")
messages = [
{
"role": "system",
"content": classifier_statement,
}
]
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
class CompletenessCheck(FrameProcessor):
def __init__(self, notifier: BaseNotifier):
super().__init__()
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("Completeness check NO")
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._frames_buffer = []
self._notifier = notifier
def close_gate(self):
self._gate_open = False
def open_gate(self):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
return
if self._gate_open:
await self.push_frame(frame, direction)
return
self._frames_buffer.append((frame, direction))
async def _start(self):
self._frames_buffer = []
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
self.open_gate()
for frame, direction in self._frames_buffer:
await self.push_frame(frame, direction)
self._frames_buffer = []
except asyncio.CancelledError:
break
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but we have the machinery to use an LLM, so we might as well!
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
logger.debug(f"Completeness check frame: {frame}")
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
# # Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user has completed a
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
ParallelPipeline(
[
# Pass everything except UserStoppedSpeaking to the elements after
# this ParallelPipeline
FunctionFilter(filter=block_user_stopped_speaking),
],
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.
],
),
tts,
user_idle,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message} - {sender}")
if "message" not in message:
return
await task.queue_frames(
[
UserStartedSpeakingFrame(),
TranscriptionFrame(
user_id=sender, timestamp=time.time(), text=message["message"]
),
UserStoppedSpeakingFrame(),
]
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,433 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import time
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.user_idle_processor import UserIdleProcessor
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
classifier_statement = """Determine if the user's statement ends with a complete thought and you should respond.
The user text is transcribed speech. You are trying to determine if:
1. the user has finished talking and expects a response from you, or
2. this statement is incomplete and the user will continue talking
A previous assistant response is provided for additional context. But you are only evaluating the user text.
The user text may contain multiple fragments concatentated together. There may be repeated words or mistakes in the transcription. There may be grammatical errors. There may be extra punctuation. Ignore all of that. Interpret the transcribed text as text that would have been spoken. Then consider only whether the user has finished speaking and is expecting a response.
Categorize the last user statement as either complete with the user now expecting a response, or incomplete.
Return 'YES' if text is likely complete and the user is expecting a response. Return 'NO' if the text seems to be a partial expression or unfinished thought.
If you are not sure, respond with your best guess. If the user is expecting a response, respond with YES. If the user is not expecting a response, respond with NO. Always output either YES or NO and no other text.
Respond only YES or NO
Examples:
User: What's the capital of
Assistant: NO
User: What's the captial of France?
Assistant: YES
User: Tell me a story about
Assistant: NO
User: Tell me a story about a dragon
Assistant YES
User: Is there a
Assistant: NO
User: Is there a large
Assistant: NO
User: Is there a large lake near Chicago?
Assistant: YES
User: When is the longest day of the year?
Assistant: YES
User: When when is the longest day of the year
Assistant: YES
User: When when is the
ASSISTANT: NO
User: What is the um I u
Assistant: NO
User: What is the um i u largest city in the world
Assistant: YES
User: How much does a how much does an adult elephant weigh?
Assistant: YES
User: How much does a how much does
Assistant: NO
User: What can you tell me All the
Assistant: NO
User: What can you tell me All the prime numbers less than 100
Assistant: YES
User: What's the what's the length of the Amazon River?
Assistant: YES
User: What's what's the length of the Amazon River?
Assistant: YES
User: What's what's the length of the Amazon River
Assistant: YES
User: What's what's the best way to get a coffee stain out of a white shirt
Assistant: YES
"""
conversational_system_message = """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.
Please be very concise in your responses. Unless you are explicitly asked to do otherwise, give me the shortest complete answer possible without unnecessary elaboration. Generally you should answer with a single sentence.
"""
class StatementJudgeContextFilter(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
return
# Just treat an LLMMessagesFrame as complete, no matter what.
if isinstance(frame, LLMMessagesFrame):
await self._notifier.notify()
return
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
# messages frame that contains a system prompt and the most recent user messages,
# concatenated.
if isinstance(frame, OpenAILLMContextFrame):
# Take text content from the most recent user messages.
messages = frame.context.messages
user_text_messages = []
last_assistant_message = None
for message in reversed(messages):
if message["role"] != "user":
if message["role"] == "assistant":
last_assistant_message = message
break
if isinstance(message["content"], str):
user_text_messages.append(message["content"])
elif isinstance(message["content"], list):
for content in message["content"]:
if content["type"] == "text":
user_text_messages.insert(0, content["text"])
# If we have any user text content, push an LLMMessagesFrame
if user_text_messages:
user_message = " ".join(reversed(user_text_messages))
logger.debug(f"!!! {user_message}")
messages = [
{
"role": "system",
"content": classifier_statement,
}
]
if last_assistant_message:
messages.append(last_assistant_message)
messages.append({"role": "user", "content": user_message})
await self.push_frame(LLMMessagesFrame(messages))
class CompletenessCheck(FrameProcessor):
def __init__(self, notifier: BaseNotifier):
super().__init__()
self._notifier = notifier
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("!!! Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("!!! Completeness check NO")
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._frames_buffer = []
self._notifier = notifier
def close_gate(self):
self._gate_open = False
def open_gate(self):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
return
if self._gate_open:
await self.push_frame(frame, direction)
return
self._frames_buffer.append((frame, direction))
async def _start(self):
self._frames_buffer = []
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
self.open_gate()
for frame, direction in self._frames_buffer:
await self.push_frame(frame, direction)
self._frames_buffer = []
except asyncio.CancelledError:
break
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but we have the machinery to use an LLM, so we might as well!
statement_llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-haiku-20241022", name="Haiku"
)
# This is the regular LLM.
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20241022",
name="Sonnet",
params=AnthropicLLMService.InputParams(enable_prompt_caching_beta=True),
)
messages = [
{
"role": "system",
"content": conversational_system_message,
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(notifier=notifier)
# # Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user has completed a
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
ParallelPipeline(
[
# Pass everything except UserStoppedSpeaking to the elements after
# this ParallelPipeline
FunctionFilter(filter=block_user_stopped_speaking),
],
[
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
# LLMMessagesFrame to the statement classifier LLM. The only frame this
# sub-pipeline will output is a UserStoppedSpeakingFrame.
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.
],
),
tts,
user_idle,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{
"role": "user",
"content": "Start by just saying \"Hello I'm ready.\" Don't say anything else.",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message} - {sender}")
if "message" not in message:
return
await task.queue_frames(
[
UserStartedSpeakingFrame(),
TranscriptionFrame(
user_id=sender, timestamp=time.time(), text=message["message"]
),
UserStoppedSpeakingFrame(),
]
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,355 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import time
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService, GoogleLLMContext
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.user_idle_processor import UserIdleProcessor
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
classifier_statement = """You are an audio language classifier model. You are receiving audio from a user in a WebRTC call. Your job is to decide whether the user has finished speaking or not.
Categorize the input you receive as either:
1. a complete thought, statement, or question, or
2. an incomplete thought, statement, or question
Output 'YES' if the input is likely to be a completed thought, statement, or question.
Output 'NO' if the input indicates that the user is still speaking and does not yet expect a response yet.
If you are unsure, output 'YES'.
"""
conversational_system_message = """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.
Please be very concise in your responses. Unless you are explicitly asked to do otherwise, give me the shortest complete answer possible without unnecessary elaboration. Generally you should answer with a single sentence.
"""
class StatementJudgeAudioContextAccumulator(FrameProcessor):
def __init__(self, *, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._audio_frames = []
self._audio_frames = []
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
self._user_speaking = False
async def reset(self):
self._audio_frames = []
self._user_speaking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# ignore context frame
if isinstance(frame, OpenAILLMContextFrame):
return
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
return
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
context = GoogleLLMContext()
context.set_messages([{"role": "system", "content": classifier_statement}])
context.add_audio_frames_message(audio_frames=self._audio_frames)
await self.push_frame(OpenAILLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
else:
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary. Assume all audio frames have the same duration.
self._audio_frames.append(frame)
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
buffer_duration = frame_duration * len(self._audio_frames)
while buffer_duration > self._start_secs:
self._audio_frames.pop(0)
buffer_duration -= frame_duration
await self.push_frame(frame, direction)
class CompletenessCheck(FrameProcessor):
def __init__(
self, notifier: BaseNotifier, audio_accumulator: StatementJudgeAudioContextAccumulator
):
super().__init__()
self._notifier = notifier
self._audio_accumulator = audio_accumulator
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text.startswith("YES"):
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self._audio_accumulator.reset()
await self._notifier.notify()
elif isinstance(frame, TextFrame):
if frame.text.strip():
logger.debug(f"Completeness check NO - '{frame.text}'")
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._frames_buffer = []
self._notifier = notifier
def close_gate(self):
self._gate_open = False
def open_gate(self):
self._gate_open = True
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We must not block system frames.
if isinstance(frame, SystemFrame):
if isinstance(frame, StartFrame):
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
if isinstance(frame, StartInterruptionFrame):
self._frames_buffer = []
self.close_gate()
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
return
if self._gate_open:
await self.push_frame(frame, direction)
return
self._frames_buffer.append((frame, direction))
async def _start(self):
self._frames_buffer = []
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
self.open_gate()
for frame, direction in self._frames_buffer:
await self.push_frame(frame, direction)
self._frames_buffer = []
except asyncio.CancelledError:
break
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but we have the machinery to use an LLM, so we might as well!
statement_llm = GoogleLLMService(
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
# This is the regular LLM.
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
messages = [
{
"role": "system",
"content": conversational_system_message,
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This turns the LLM context into an inference request to classify the user's speech
# as complete or incomplete.
statement_judge_context_filter = StatementJudgeAudioContextAccumulator(notifier=notifier)
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
completeness_check = CompletenessCheck(
notifier=notifier, audio_accumulator=statement_judge_context_filter
)
# # Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user has completed a
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
)
pipeline = Pipeline(
[
transport.input(),
ParallelPipeline(
[
# Pass everything except UserStoppedSpeaking to the elements after
# this ParallelPipeline
FunctionFilter(filter=block_user_stopped_speaking),
],
[
statement_judge_context_filter,
statement_llm,
completeness_check,
],
[
stt,
context_aggregator.user(),
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
FunctionFilter(filter=pass_only_llm_trigger_frames),
llm,
bot_output_gate, # Buffer all llm/tts output until notified.
],
),
tts,
user_idle,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message} - {sender}")
if "message" not in message:
return
await task.queue_frames(
[
UserStartedSpeakingFrame(),
TranscriptionFrame(
user_id=sender, timestamp=time.time(), text=message["message"]
),
UserStoppedSpeakingFrame(),
]
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,121 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, MixerUpdateSettingsFrame, MixerEnableFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Bot Background Sound")
parser.add_argument("-i", "--input", type=str, required=True, help="Input audio file")
(room_url, token, args) = await configure_with_args(session, parser)
soundfile_mixer = SoundfileMixer(
sound_files={"office": args.input},
default_sound="office",
volume=2.0,
)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_mixer=soundfile_mixer,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Show how to use mixer control frames.
await asyncio.sleep(10.0)
await task.queue_frame(MixerUpdateSettingsFrame({"volume": 0.5}))
await asyncio.sleep(5.0)
await task.queue_frame(MixerEnableFrame(False))
await asyncio.sleep(5.0)
await task.queue_frame(MixerEnableFrame(True))
await asyncio.sleep(5.0)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,98 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
LLMMessagesFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Configure the mute processor to mute only during first speech
stt_mute_processor = STTMuteFilter(
stt_service=stt, config=STTMuteConfig(strategy=STTMuteStrategy.FIRST_SPEECH)
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt_mute_processor, # Add the mute processor before STT
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -182,7 +182,7 @@ class IntakeProcessor:
}
)
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
async def start_allergies(self, function_name, llm, context):
@@ -222,7 +222,7 @@ class IntakeProcessor:
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
print("!!! doing start conditions")
@@ -261,7 +261,7 @@ class IntakeProcessor:
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
@@ -270,7 +270,7 @@ class IntakeProcessor:
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
logger.info(f"!!! Saving data: {args}")

View File

@@ -5,36 +5,33 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMMessagesFrame,
OutputImageRawFrame,
SpriteFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import (
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -73,15 +70,15 @@ class TalkingAnimation(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
elif isinstance(frame, TTSStoppedFrame):
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame)
await self.push_frame(frame, direction)
async def main():
@@ -162,7 +159,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -12,7 +12,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
@@ -35,6 +35,7 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_sample_rate=16000,
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
@@ -50,6 +51,7 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
)
messages = [
@@ -74,7 +76,7 @@ async def main():
]
)
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):

View File

@@ -42,7 +42,7 @@ aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=13.1" ]
daily = [ "daily-python~=0.12.0" ]
daily = [ "daily-python~=0.13.0" ]
deepgram = [ "deepgram-sdk~=3.7.3" ]
elevenlabs = [ "websockets~=13.1" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
@@ -51,15 +51,18 @@ gladia = [ "websockets~=13.1" ]
google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
krisp = [ "pipecat-ai-krisp~=0.2.0" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "openai~=1.50.2", "websockets~=13.1", "python-deepcompare~=1.0.1" ]
openpipe = [ "openpipe~=4.24.0" ]
playht = [ "pyht~=0.1.4", "websockets~=13.1" ]
silero = [ "onnxruntime~=1.19.2" ]
soundfile = [ "soundfile~=0.12.1" ]
together = [ "openai~=1.50.2" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.0" ]
whisper = [ "faster-whisper~=1.0.3" ]

View File

View File

@@ -0,0 +1,47 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import FilterControlFrame
class BaseAudioFilter(ABC):
"""This is a base class for input transport audio filters. If an audio
filter is provided to the input transport it will be used to process audio
before VAD and before pushing it downstream. There are control frames to
update filter settings or to enable or disable the filter at runtime.
"""
@abstractmethod
async def start(self, sample_rate: int):
"""This will be called from the input transport when the transport is
started. It can be used to initialize the filter. The input transport
sample rate is provided so the filter can adjust to that sample rate.
"""
pass
@abstractmethod
async def stop(self):
"""This will be called from the input transport when the transport is
stopping.
"""
pass
@abstractmethod
async def process_frame(self, frame: FilterControlFrame):
"""This will be called when the input transport receives a
FilterControlFrame.
"""
pass
@abstractmethod
async def filter(self, audio: bytes) -> bytes:
pass

View File

@@ -0,0 +1,78 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import os
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from loguru import logger
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
try:
from pipecat_ai_krisp.audio.krisp_processor import KrispAudioProcessor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the Krisp filter, you need to `pip install pipecat-ai[krisp]`.")
raise Exception(f"Missing module: {e}")
class KrispFilter(BaseAudioFilter):
def __init__(
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
) -> None:
"""
Initializes the KrispAudioProcessor with customizable audio processing settings.
:param sample_type: The type of audio sample, default is 'PCM_16'.
:param channels: Number of audio channels, default is 1.
:param model_path: Path to the Krisp model; defaults to environment variable KRISP_MODEL_PATH if not provided.
"""
super().__init__()
# Set model path, checking environment if not specified
self._model_path = model_path or os.getenv("KRISP_MODEL_PATH")
if not self._model_path:
logger.error(
"Model path for KrispAudioProcessor is not provided and KRISP_MODEL_PATH is not set."
)
raise ValueError("Model path for KrispAudioProcessor must be provided.")
self._sample_type = sample_type
self._channels = channels
self._sample_rate = 0
self._filtering = True
self._krisp_processor = None
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
self._krisp_processor = KrispAudioProcessor(
self._sample_rate, self._sample_type, self._channels, self._model_path
)
async def stop(self):
self._krisp_processor = None
async def process_frame(self, frame: FilterControlFrame):
if isinstance(frame, FilterEnableFrame):
self._filtering = frame.enable
async def filter(self, audio: bytes) -> bytes:
if not self._filtering:
return audio
data = np.frombuffer(audio, dtype=np.int16)
# Add a small epsilon to avoid division by zero.
epsilon = 1e-10
data = data.astype(np.float32) + epsilon
# Process the audio chunk to reduce noise
reduced_noise = self._krisp_processor.process(data)
# Clip and set processed audio back to frame
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
return audio

View File

@@ -0,0 +1,54 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from loguru import logger
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
try:
import noisereduce as nr
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the noisereduce filter, you need to `pip install pipecat-ai[noisereduce]`."
)
raise Exception(f"Missing module: {e}")
class NoisereduceFilter(BaseAudioFilter):
def __init__(self) -> None:
self._filtering = True
self._sample_rate = 0
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
async def stop(self):
pass
async def process_frame(self, frame: FilterControlFrame):
if isinstance(frame, FilterEnableFrame):
self._filtering = frame.enable
async def filter(self, audio: bytes) -> bytes:
if not self._filtering:
return audio
data = np.frombuffer(audio, dtype=np.int16)
# Add a small epsilon to avoid division by zero.
epsilon = 1e-10
data = data.astype(np.float32) + epsilon
# Noise reduction
reduced_noise = nr.reduce_noise(y=data, sr=self._sample_rate)
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
return audio

View File

View File

@@ -0,0 +1,53 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import MixerControlFrame
class BaseAudioMixer(ABC):
"""This is a base class for output transport audio mixers. If an audio mixer
is provided to the output transport it will be used to mix the audio frames
coming into to the transport with the audio generated from the mixer. There
are control frames to update mixer settings or to enable or disable the
mixer at runtime.
"""
@abstractmethod
async def start(self, sample_rate: int):
"""This will be called from the output transport when the transport is
started. It can be used to initialize the mixer. The output transport
sample rate is provided so the mixer can adjust to that sample rate.
"""
pass
@abstractmethod
async def stop(self):
"""This will be called from the output transport when the transport is
stopping.
"""
pass
@abstractmethod
async def process_frame(self, frame: MixerControlFrame):
"""This will be called when the output transport receives a
MixerControlFrame.
"""
pass
@abstractmethod
async def mix(self, audio: bytes) -> bytes:
"""This is called with the audio that is about to be sent from the
output transport and that should be mixed with the mixer audio if the
mixer is enabled.
"""
pass

View File

@@ -0,0 +1,147 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Any, Dict, Mapping
import numpy as np
from loguru import logger
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import resample_audio
from pipecat.frames.frames import MixerControlFrame, MixerEnableFrame, MixerUpdateSettingsFrame
try:
import soundfile as sf
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the soundfile mixer, you need to `pip install pipecat-ai[soundfile]`."
)
raise Exception(f"Missing module: {e}")
class SoundfileMixer(BaseAudioMixer):
"""This is an audio mixer that mixes incoming audio with audio from a
file. It uses the soundfile library to load files so it supports multiple
formats. The audio files need to only have one channel (mono) but they can
have any sample rate that will be resampled to the output transport sample
rate.
Multiple files can be loaded, each with a different name. The
`MixerUpdateSettingsFrame` has the following settings available: `sound`
(str) and `volume` (float) to be able to update to a different sound file or
to change the volume at runtime.
"""
def __init__(
self,
sound_files: Mapping[str, str],
default_sound: str,
volume: float = 0.4,
loop: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self._sound_files = sound_files
self._volume = volume
self._sample_rate = 0
self._sound_pos = 0
self._sounds: Dict[str, Any] = {}
self._current_sound = default_sound
self._mixing = True
self._loop = loop
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
for sound_name, file_name in self._sound_files.items():
await asyncio.to_thread(self._load_sound_file, sound_name, file_name)
async def stop(self):
pass
async def process_frame(self, frame: MixerControlFrame):
if isinstance(frame, MixerUpdateSettingsFrame):
await self._update_settings(frame)
elif isinstance(frame, MixerEnableFrame):
await self._enable_mixing(frame.enable)
pass
async def mix(self, audio: bytes) -> bytes:
return self._mix_with_sound(audio)
async def _enable_mixing(self, enable: bool):
self._mixing = enable
async def _update_settings(self, frame: MixerUpdateSettingsFrame):
for setting, value in frame.settings.items():
match setting:
case "sound":
await self._change_sound(value)
case "volume":
await self._update_volume(value)
case "loop":
await self._update_loop(value)
async def _change_sound(self, sound: str):
if sound in self._sound_files:
self._current_sound = sound
self._sound_pos = 0
else:
logger.error(f"Sound {sound} is not available")
async def _update_volume(self, volume: float):
self._volume = volume
async def _update_loop(self, loop: bool):
self._loop = loop
def _load_sound_file(self, sound_name: str, file_name: str):
try:
logger.debug(f"Loading background sound from {file_name}")
sound, sample_rate = sf.read(file_name, dtype="int16")
audio = sound.tobytes()
if sample_rate != self._sample_rate:
logger.debug(f"Resampling background sound to {self._sample_rate}")
audio = resample_audio(audio, sample_rate, self._sample_rate)
# Convert from np to bytes again.
self._sounds[sound_name] = np.frombuffer(audio, dtype=np.int16)
except Exception as e:
logger.error(f"Unable to open file {file_name}: {e}")
def _mix_with_sound(self, audio: bytes):
"""Mixes raw audio frames with chunks of the same length from the sound
file.
"""
if not self._mixing:
return audio
audio_np = np.frombuffer(audio, dtype=np.int16)
chunk_size = len(audio_np)
# Sound currently playing.
sound = self._sounds[self._current_sound]
# Go back to the beginning if we don't have enough data.
if self._sound_pos + chunk_size > len(sound):
if not self._loop:
return audio
self._sound_pos = 0
start_pos = self._sound_pos
end_pos = self._sound_pos + chunk_size
self._sound_pos = end_pos
sound_np = sound[start_pos:end_pos]
mixed_audio = np.clip(audio_np + sound_np * self._volume, -32768, 32767).astype(np.int16)
return mixed_audio.astype(np.int16).tobytes()

View File

@@ -5,7 +5,7 @@
#
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Mapping, Optional, Tuple
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.clocks.base_clock import BaseClock
@@ -557,7 +557,7 @@ class TTSStoppedFrame(ControlFrame):
class ServiceUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update service settings."""
settings: Dict[str, Any]
settings: Mapping[str, Any]
@dataclass
@@ -570,6 +570,13 @@ class TTSUpdateSettingsFrame(ServiceUpdateSettingsFrame):
pass
@dataclass
class STTMuteFrame(ControlFrame):
"""Control frame to mute/unmute the STT service."""
mute: bool
@dataclass
class STTUpdateSettingsFrame(ServiceUpdateSettingsFrame):
pass
@@ -582,3 +589,45 @@ class VADParamsUpdateFrame(ControlFrame):
"""
params: VADParams
@dataclass
class FilterControlFrame(ControlFrame):
"""Base control frame for other audio filter frames."""
pass
@dataclass
class FilterUpdateSettingsFrame(FilterControlFrame):
"""Control frame to update filter settings."""
settings: Mapping[str, Any]
@dataclass
class FilterEnableFrame(FilterControlFrame):
"""Control frame to enable or disable the filter at runtime."""
enable: bool
@dataclass
class MixerControlFrame(ControlFrame):
"""Base control frame for other audio mixer frames."""
pass
@dataclass
class MixerUpdateSettingsFrame(MixerControlFrame):
"""Control frame to update mixer settings."""
settings: Mapping[str, Any]
@dataclass
class MixerEnableFrame(MixerControlFrame):
"""Control frame to enable or disable the mixer at runtime."""
enable: bool

View File

@@ -110,13 +110,13 @@ class ParallelPipeline(BasePipeline):
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
# TODO(aleix): We are creating task for each frame. For real-time
# video/audio this might be too slow. We should use an already
# created task instead.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])
# If we get an EndFrame we stop our queue processing tasks and wait on
# all the pipelines to finish.

View File

@@ -77,9 +77,9 @@ class Pipeline(BasePipeline):
await super().process_frame(frame, direction)
if direction == FrameDirection.DOWNSTREAM:
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
elif direction == FrameDirection.UPSTREAM:
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
await self._sink.queue_frame(frame, FrameDirection.UPSTREAM)
async def _cleanup_processors(self):
for p in self._processors:

View File

@@ -160,19 +160,17 @@ class PipelineTask:
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock,
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
await self._source.process_frame(
self._initial_metrics_frame(), FrameDirection.DOWNSTREAM
)
await self._source.queue_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
running = True
should_cleanup = True
while running:
try:
frame = await self._push_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not isinstance(frame, (StopTaskFrame, EndFrame))

View File

@@ -15,6 +15,7 @@ from loguru import logger
from PIL import Image
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
@@ -174,6 +175,10 @@ class OpenAILLMContext:
content.append({"type": "text", "text": text})
self.add_message({"role": "user", "content": content})
def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: str = None):
# todo: implement for OpenAI models and others
pass
async def call_function(
self,
f: Callable[
@@ -213,6 +218,29 @@ class OpenAILLMContext:
await f(function_name, tool_call_id, arguments, llm, self, function_call_result_callback)
def create_wav_header(self, sample_rate, num_channels, bits_per_sample, data_size):
# RIFF chunk descriptor
header = bytearray()
header.extend(b"RIFF") # ChunkID
header.extend((data_size + 36).to_bytes(4, "little")) # ChunkSize: total size - 8
header.extend(b"WAVE") # Format
# "fmt " sub-chunk
header.extend(b"fmt ") # Subchunk1ID
header.extend((16).to_bytes(4, "little")) # Subchunk1Size (16 for PCM)
header.extend((1).to_bytes(2, "little")) # AudioFormat (1 for PCM)
header.extend(num_channels.to_bytes(2, "little")) # NumChannels
header.extend(sample_rate.to_bytes(4, "little")) # SampleRate
# Calculate byte rate and block align
byte_rate = sample_rate * num_channels * (bits_per_sample // 8)
block_align = num_channels * (bits_per_sample // 8)
header.extend(byte_rate.to_bytes(4, "little")) # ByteRate
header.extend(block_align.to_bytes(2, "little")) # BlockAlign
header.extend(bits_per_sample.to_bytes(2, "little")) # BitsPerSample
# "data" sub-chunk
header.extend(b"data") # Subchunk2ID
header.extend(data_size.to_bytes(4, "little")) # Subchunk2Size
return header
@dataclass
class OpenAILLMContextFrame(Frame):

View File

@@ -11,19 +11,27 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FunctionFilter(FrameProcessor):
def __init__(self, filter: Callable[[Frame], Awaitable[bool]]):
def __init__(
self,
filter: Callable[[Frame], Awaitable[bool]],
direction: FrameDirection = FrameDirection.DOWNSTREAM,
):
super().__init__()
self._filter = filter
self._direction = direction
#
# Frame processor
#
def _should_passthrough_frame(self, frame):
return isinstance(frame, SystemFrame)
# Ignore system frames and frames that are not following the direction of this gate
def _should_passthrough_frame(self, frame, direction):
return isinstance(frame, SystemFrame) or direction != self._direction
async def process_frame(self, frame: Frame, direction: FrameDirection):
passthrough = self._should_passthrough_frame(frame)
await super().process_frame(frame, direction)
passthrough = self._should_passthrough_frame(frame, direction)
allowed = await self._filter(frame)
if passthrough or allowed:
await self.push_frame(frame, direction)

View File

@@ -0,0 +1,111 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Optional
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
StartInterruptionFrame,
StopInterruptionFrame,
STTMuteFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import STTService
class STTMuteStrategy(Enum):
FIRST_SPEECH = "first_speech" # Mute only during first bot speech
ALWAYS = "always" # Mute during all bot speech
CUSTOM = "custom" # Allow custom logic via callback
@dataclass
class STTMuteConfig:
"""Configuration for STTMuteFilter"""
strategy: STTMuteStrategy
# Optional callback for custom muting logic
should_mute_callback: Optional[Callable[["STTMuteFilter"], Awaitable[bool]]] = None
class STTMuteFilter(FrameProcessor):
"""A general-purpose processor that handles STT muting and interruption control.
This processor combines the concepts of STT muting and interruption control,
treating them as a single coordinated feature. When STT is muted, interruptions
are automatically disabled.
"""
def __init__(self, stt_service: STTService, config: STTMuteConfig, **kwargs):
super().__init__(**kwargs)
self._stt_service = stt_service
self._config = config
self._first_speech_handled = False
self._bot_is_speaking = False
@property
def is_muted(self) -> bool:
"""Returns whether STT is currently muted."""
return self._stt_service.is_muted
async def _handle_mute_state(self, should_mute: bool):
"""Handles both STT muting and interruption control."""
if should_mute != self.is_muted:
logger.debug(f"STT {'muting' if should_mute else 'unmuting'}")
await self.push_frame(STTMuteFrame(mute=should_mute))
async def _should_mute(self) -> bool:
"""Determines if STT should be muted based on current state and strategy."""
if not self._bot_is_speaking:
return False
if self._config.strategy == STTMuteStrategy.ALWAYS:
return True
elif (
self._config.strategy == STTMuteStrategy.FIRST_SPEECH and not self._first_speech_handled
):
self._first_speech_handled = True
return True
elif self._config.strategy == STTMuteStrategy.CUSTOM and self._config.should_mute_callback:
return await self._config.should_mute_callback(self)
return False
async def process_frame(self, frame: Frame, direction: FrameDirection):
# Handle bot speaking state changes
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_is_speaking = True
await self._handle_mute_state(await self._should_mute())
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_is_speaking = False
await self._handle_mute_state(await self._should_mute())
# Handle frame propagation
if isinstance(
frame,
(
StartInterruptionFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
),
):
# Only pass VAD-related frames when not muted
if not self.is_muted:
await self.push_frame(frame, direction)
else:
logger.debug(f"{frame.__class__.__name__} suppressed - STT currently muted")
else:
# Pass all other frames through
await self.push_frame(frame, direction)

View File

@@ -8,6 +8,7 @@ import asyncio
import inspect
from enum import Enum
from typing import Awaitable, Callable, Optional
from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
@@ -62,6 +63,13 @@ class FrameProcessor:
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# `resume_processing_frames()`.
self.__should_block_frames = False
self.__create_input_task()
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule. This create this task.
@@ -126,7 +134,8 @@ class FrameProcessor:
await self.stop_processing_metrics()
async def cleanup(self):
pass
await self.__cancel_input_task()
await self.__cancel_push_task()
def link(self, processor: "FrameProcessor"):
self._next = processor
@@ -145,6 +154,28 @@ class FrameProcessor:
def get_clock(self) -> BaseClock:
return self._clock
async def queue_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
callback: Optional[
Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
] = None,
):
if isinstance(frame, SystemFrame):
# We don't want to queue system frames.
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
async def pause_processing_frames(self):
self.__should_block_frames = True
async def resume_processing_frames(self):
self.__input_event.set()
self.__should_block_frames = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._clock = frame.clock
@@ -189,11 +220,16 @@ class FrameProcessor:
#
async def _start_interruption(self):
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Create a new queue and task.
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
@@ -204,17 +240,55 @@ class FrameProcessor:
try:
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame, direction)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
await self._prev.process_frame(frame, direction)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
def __create_input_task(self):
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.get_event_loop().create_task(
self.__input_frame_task_handler()
)
self.__input_event = asyncio.Event()
async def __cancel_input_task(self):
self.__input_frame_task.cancel()
await self.__input_frame_task
async def __input_frame_task_handler(self):
running = True
while running:
try:
if self.__should_block_frames:
await self.__input_event.wait()
self.__input_event.clear()
(frame, direction, callback) = await self.__input_queue.get()
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
running = not isinstance(frame, EndFrame)
self.__input_queue.task_done()
except asyncio.CancelledError:
break
def __create_push_task(self):
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
self.__push_frame_task.cancel()
await self.__push_frame_task
async def __push_frame_task_handler(self):
running = True
while running:

View File

@@ -366,10 +366,6 @@ class RTVIMetricsMessage(BaseModel):
data: Mapping[str, Any]
class RTVIProcessorParams(BaseModel):
send_bot_ready: bool = True
class RTVIFrameProcessor(FrameProcessor):
def __init__(self, direction: FrameDirection = FrameDirection.DOWNSTREAM, **kwargs):
super().__init__(**kwargs)
@@ -573,16 +569,14 @@ class RTVIProcessor(FrameProcessor):
self,
*,
config: RTVIConfig = RTVIConfig(config=[]),
params: RTVIProcessorParams = RTVIProcessorParams(),
**kwargs,
):
super().__init__(**kwargs)
self._config = config
self._params = params
self._pipeline: FrameProcessor | None = None
self._pipeline_started = False
self._bot_ready = False
self._client_ready = False
self._client_ready_id = ""
@@ -590,14 +584,15 @@ class RTVIProcessor(FrameProcessor):
self._registered_services: Dict[str, RTVIService] = {}
# A task to process incoming action frames.
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
self._action_queue = asyncio.Queue()
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
# A task to process incoming transport messages.
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._message_queue = asyncio.Queue()
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._register_event_handler("on_bot_ready")
self._register_event_handler("on_bot_started")
self._register_event_handler("on_client_ready")
def register_action(self, action: RTVIAction):
id = self._action_id(action.service, action.action)
@@ -606,6 +601,15 @@ class RTVIProcessor(FrameProcessor):
def register_service(self, service: RTVIService):
self._registered_services[service.name] = service
async def set_client_ready(self):
self._client_ready = True
await self._call_event_handler("on_client_ready")
async def set_bot_ready(self):
self._bot_ready = True
await self._update_config(self._config, False)
await self._send_bot_ready()
async def interrupt_bot(self):
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
@@ -613,11 +617,6 @@ class RTVIProcessor(FrameProcessor):
message = RTVIError(data=RTVIErrorData(error=error, fatal=False))
await self._push_transport_message(message)
async def set_client_ready(self):
if not self._client_ready:
self._client_ready = True
await self._maybe_send_bot_ready()
async def handle_message(self, message: RTVIMessage):
await self._message_queue.put(message)
@@ -681,21 +680,15 @@ class RTVIProcessor(FrameProcessor):
await self._pipeline.cleanup()
async def _start(self, frame: StartFrame):
self._pipeline_started = True
await self._maybe_send_bot_ready()
await self._call_event_handler("on_bot_started")
async def _stop(self, frame: EndFrame):
if self._action_task:
self._action_task.cancel()
await self._action_task
self._action_task = None
if self._message_task:
self._message_task.cancel()
await self._message_task
self._message_task = None
await self._cancel_tasks()
async def _cancel(self, frame: CancelFrame):
await self._cancel_tasks()
async def _cancel_tasks(self):
if self._action_task:
self._action_task.cancel()
await self._action_task
@@ -769,9 +762,8 @@ class RTVIProcessor(FrameProcessor):
logger.warning(f"Exception processing message: {e}")
async def _handle_client_ready(self, request_id: str):
self._client_ready = True
self._client_ready_id = request_id
await self._maybe_send_bot_ready()
await self.set_client_ready()
async def _handle_describe_config(self, request_id: str):
services = list(self._registered_services.values())
@@ -841,16 +833,7 @@ class RTVIProcessor(FrameProcessor):
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)
async def _maybe_send_bot_ready(self):
if self._pipeline_started and self._client_ready:
await self._update_config(self._config, False)
await self._send_bot_ready()
await self._call_event_handler("on_bot_ready")
async def _send_bot_ready(self):
if not self._params.send_bot_ready:
return
message = RTVIBotReady(
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=self._config.config),

View File

@@ -22,6 +22,7 @@ from pipecat.frames.frames import (
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
STTMuteFrame,
STTUpdateSettingsFrame,
TextFrame,
TTSAudioRawFrame,
@@ -284,11 +285,7 @@ class TTSService(AIService):
logger.warning(f"Unknown setting for TTS service: {key}")
async def say(self, text: str):
aggregate_sentences = self._aggregate_sentences
self._aggregate_sentences = False
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
self._aggregate_sentences = aggregate_sentences
await self.flush_audio()
await self.queue_frame(TTSSpeakFrame(text))
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -395,7 +392,6 @@ class WordTTSService(TTSService):
def reset_word_timestamps(self):
self._initial_word_timestamp = -1
self._word_timestamps = []
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
for word, timestamp in word_times:
@@ -430,7 +426,10 @@ class WordTTSService(TTSService):
while True:
try:
(word, timestamp) = await self._words_queue.get()
if word == "LLMFullResponseEndFrame" and timestamp == 0:
if word == "Reset" and timestamp == 0:
self.reset_word_timestamps()
frame = None
elif word == "LLMFullResponseEndFrame" and timestamp == 0:
frame = LLMFullResponseEndFrame()
frame.pts = last_pts
elif word == "TTSStoppedFrame" and timestamp == 0:
@@ -439,8 +438,9 @@ class WordTTSService(TTSService):
else:
frame = TextFrame(word)
frame.pts = self._initial_word_timestamp + timestamp
last_pts = frame.pts
await self.push_frame(frame)
if frame:
last_pts = frame.pts
await self.push_frame(frame)
self._words_queue.task_done()
except asyncio.CancelledError:
break
@@ -455,6 +455,12 @@ class STTService(AIService):
super().__init__(**kwargs)
self._audio_passthrough = audio_passthrough
self._settings: Dict[str, Any] = {}
self._muted: bool = False
@property
def is_muted(self) -> bool:
"""Returns whether the STT service is currently muted."""
return self._muted
@abstractmethod
async def set_model(self, model: str):
@@ -483,7 +489,8 @@ class STTService(AIService):
logger.warning(f"Unknown setting for STT service: {key}")
async def process_audio_frame(self, frame: AudioRawFrame):
await self.process_generator(self.run_stt(frame.audio))
if not self._muted:
await self.process_generator(self.run_stt(frame.audio))
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it."""
@@ -498,6 +505,9 @@ class STTService(AIService):
await self.push_frame(frame, direction)
elif isinstance(frame, STTUpdateSettingsFrame):
await self._update_settings(frame.settings)
elif isinstance(frame, STTMuteFrame):
self._muted = frame.mute
logger.debug(f"STT service {'muted' if frame.mute else 'unmuted'}")
else:
await self.push_frame(frame, direction)

View File

@@ -671,6 +671,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
):
self._function_call_in_progress = None
self._function_call_result = frame
await self._push_aggregation()
else:
logger.warning(
"FunctionCallResultFrame tool_call_id != InProgressFrame tool_call_id"
@@ -679,9 +680,12 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
self._function_call_result = None
elif isinstance(frame, AnthropicImageMessageFrame):
self._pending_image_frame_message = frame
await self._push_aggregation()
async def _push_aggregation(self):
if not self._aggregation:
if not (
self._aggregation or self._function_call_result or self._pending_image_frame_message
):
return
run_llm = False
@@ -694,20 +698,18 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
frame = self._function_call_result
self._function_call_result = None
if frame.result:
self._context.add_message(
assistant_message = {"role": "assistant", "content": []}
if aggregation:
assistant_message["content"].append({"type": "text", "text": aggregation})
assistant_message["content"].append(
{
"role": "assistant",
"content": [
{"type": "text", "text": aggregation},
{
"type": "tool_use",
"id": frame.tool_call_id,
"name": frame.function_name,
"input": frame.arguments,
},
],
"type": "tool_use",
"id": frame.tool_call_id,
"name": frame.function_name,
"input": frame.arguments,
}
)
self._context.add_message(assistant_message)
self._context.add_message(
{
"role": "user",
@@ -721,7 +723,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
}
)
run_llm = True
else:
elif aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._pending_image_frame_message:

View File

@@ -5,7 +5,6 @@
#
import asyncio
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -33,6 +32,40 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_aws_language(language: Language) -> str | None:
language_map = {
Language.CA: "ca-ES",
Language.ZH: "cmn-CN",
Language.DA: "da-DK",
Language.NL: "nl-NL",
Language.NL_BE: "nl-BE",
Language.EN: "en-US",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
Language.EN_GB: "en-GB",
Language.EN_NZ: "en-NZ",
Language.EN_IN: "en-IN",
Language.FI: "fi-FI",
Language.FR: "fr-FR",
Language.FR_CA: "fr-CA",
Language.DE: "de-DE",
Language.HI: "hi-IN",
Language.IT: "it-IT",
Language.JA: "ja-JP",
Language.KO: "ko-KR",
Language.NO: "nb-NO",
Language.PL: "pl-PL",
Language.PT: "pt-PT",
Language.PT_BR: "pt-BR",
Language.RO: "ro-RO",
Language.RU: "ru-RU",
Language.ES: "es-ES",
Language.SV: "sv-SE",
Language.TR: "tr-TR",
}
return language_map.get(language)
class AWSTTSService(TTSService):
class InputParams(BaseModel):
engine: Optional[str] = None
@@ -65,7 +98,7 @@ class AWSTTSService(TTSService):
"engine": params.engine,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "en-US",
"pitch": params.pitch,
"rate": params.rate,
"volume": params.volume,
@@ -77,62 +110,7 @@ class AWSTTSService(TTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.CA:
return "ca-ES"
case Language.ZH:
return "cmn-CN"
case Language.DA:
return "da-DK"
case Language.NL:
return "nl-NL"
case Language.NL_BE:
return "nl-BE"
case Language.EN | Language.EN_US:
return "en-US"
case Language.EN_AU:
return "en-AU"
case Language.EN_GB:
return "en-GB"
case Language.EN_NZ:
return "en-NZ"
case Language.EN_IN:
return "en-IN"
case Language.FI:
return "fi-FI"
case Language.FR:
return "fr-FR"
case Language.FR_CA:
return "fr-CA"
case Language.DE:
return "de-DE"
case Language.HI:
return "hi-IN"
case Language.IT:
return "it-IT"
case Language.JA:
return "ja-JP"
case Language.KO:
return "ko-KR"
case Language.NO:
return "nb-NO"
case Language.PL:
return "pl-PL"
case Language.PT:
return "pt-PT"
case Language.PT_BR:
return "pt-BR"
case Language.RO:
return "ro-RO"
case Language.RU:
return "ru-RU"
case Language.ES:
return "es-ES"
case Language.SV:
return "sv-SE"
case Language.TR:
return "tr-TR"
return None
return language_to_aws_language(language)
def _construct_ssml(self, text: str) -> str:
ssml = "<speak>"

View File

@@ -41,10 +41,11 @@ try:
from azure.cognitiveservices.speech import (
CancellationReason,
ResultReason,
ServicePropertyChannel,
SpeechConfig,
SpeechRecognizer,
SpeechSynthesizer,
SpeechSynthesisOutputFormat,
SpeechSynthesizer,
)
from azure.cognitiveservices.speech.audio import (
AudioStreamFormat,
@@ -60,6 +61,67 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_azure_language(language: Language) -> str | None:
language_map = {
Language.BG: "bg-BG",
Language.CA: "ca-ES",
Language.ZH: "zh-CN",
Language.ZH_TW: "zh-TW",
Language.CS: "cs-CZ",
Language.DA: "da-DK",
Language.NL: "nl-NL",
Language.EN: "en-US",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
Language.EN_GB: "en-GB",
Language.EN_NZ: "en-NZ",
Language.EN_IN: "en-IN",
Language.ET: "et-EE",
Language.FI: "fi-FI",
Language.NL_BE: "nl-BE",
Language.FR: "fr-FR",
Language.FR_CA: "fr-CA",
Language.DE: "de-DE",
Language.DE_CH: "de-CH",
Language.EL: "el-GR",
Language.HI: "hi-IN",
Language.HU: "hu-HU",
Language.ID: "id-ID",
Language.IT: "it-IT",
Language.JA: "ja-JP",
Language.KO: "ko-KR",
Language.LV: "lv-LV",
Language.LT: "lt-LT",
Language.MS: "ms-MY",
Language.NO: "nb-NO",
Language.PL: "pl-PL",
Language.PT: "pt-PT",
Language.PT_BR: "pt-BR",
Language.RO: "ro-RO",
Language.RU: "ru-RU",
Language.SK: "sk-SK",
Language.ES: "es-ES",
Language.SV: "sv-SE",
Language.TH: "th-TH",
Language.TR: "tr-TR",
Language.UK: "uk-UA",
Language.VI: "vi-VN",
}
return language_map.get(language)
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
sample_rate_map = {
8000: SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm,
16000: SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm,
22050: SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm,
24000: SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm,
44100: SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm,
48000: SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm,
}
return sample_rate_map.get(sample_rate, SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm)
class AzureLLMService(BaseOpenAILLMService):
def __init__(
self, *, api_key: str, endpoint: str, model: str, api_version: str = "2023-12-01-preview"
@@ -88,24 +150,7 @@ class AzureLLMService(BaseOpenAILLMService):
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
match sample_rate:
case 8000:
return SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm
case 16000:
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
case 22050:
return SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm
case 24000:
return SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm
case 44100:
return SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm
case 48000:
return SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
class AzureTTSService(TTSService):
class AzureBaseTTSService(TTSService):
class InputParams(BaseModel):
emphasis: Optional[str] = None
language: Optional[Language] = Language.EN_US
@@ -128,17 +173,12 @@ class AzureTTSService(TTSService):
):
super().__init__(sample_rate=sample_rate, **kwargs)
speech_config = SpeechConfig(subscription=api_key, region=region)
speech_config.set_speech_synthesis_output_format(sample_rate_to_output_format(sample_rate))
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._settings = {
"sample_rate": sample_rate,
"emphasis": params.emphasis,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN_US,
else "en-US",
"pitch": params.pitch,
"rate": params.rate,
"role": params.role,
@@ -147,98 +187,16 @@ class AzureTTSService(TTSService):
"volume": params.volume,
}
self.set_voice(voice)
self._api_key = api_key
self._region = region
self._voice_id = voice
self._speech_synthesizer = None
def can_generate_metrics(self) -> bool:
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.BG:
return "bg-BG"
case Language.CA:
return "ca-ES"
case Language.ZH:
return "zh-CN"
case Language.ZH_TW:
return "zh-TW"
case Language.CS:
return "cs-CZ"
case Language.DA:
return "da-DK"
case Language.NL:
return "nl-NL"
case Language.EN | Language.EN_US:
return "en-US"
case Language.EN_AU:
return "en-AU"
case Language.EN_GB:
return "en-GB"
case Language.EN_NZ:
return "en-NZ"
case Language.EN_IN:
return "en-IN"
case Language.ET:
return "et-EE"
case Language.FI:
return "fi-FI"
case Language.NL_BE:
return "nl-BE"
case Language.FR:
return "fr-FR"
case Language.FR_CA:
return "fr-CA"
case Language.DE:
return "de-DE"
case Language.DE_CH:
return "de-CH"
case Language.EL:
return "el-GR"
case Language.HI:
return "hi-IN"
case Language.HU:
return "hu-HU"
case Language.ID:
return "id-ID"
case Language.IT:
return "it-IT"
case Language.JA:
return "ja-JP"
case Language.KO:
return "ko-KR"
case Language.LV:
return "lv-LV"
case Language.LT:
return "lt-LT"
case Language.MS:
return "ms-MY"
case Language.NO:
return "nb-NO"
case Language.PL:
return "pl-PL"
case Language.PT:
return "pt-PT"
case Language.PT_BR:
return "pt-BR"
case Language.RO:
return "ro-RO"
case Language.RU:
return "ru-RU"
case Language.SK:
return "sk-SK"
case Language.ES:
return "es-ES"
case Language.SV:
return "sv-SE"
case Language.TH:
return "th-TH"
case Language.TR:
return "tr-TR"
case Language.UK:
return "uk-UA"
case Language.VI:
return "vi-VN"
return None
return language_to_azure_language(language)
def _construct_ssml(self, text: str) -> str:
language = self._settings["language"]
@@ -285,6 +243,97 @@ class AzureTTSService(TTSService):
return ssml
class AzureTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
)
speech_config.set_service_property(
"synthesizer.synthesis.connection.synthesisConnectionImpl",
"websocket",
ServicePropertyChannel.UriQueryParameter,
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
# Set up event handlers
self._audio_queue = asyncio.Queue()
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
def _handle_synthesizing(self, evt):
"""Handle audio chunks as they arrive"""
if evt.result and evt.result.audio_data:
self._audio_queue.put_nowait(evt.result.audio_data)
def _handle_completed(self, evt):
"""Handle synthesis completion"""
self._audio_queue.put_nowait(None) # Signal completion
def _handle_canceled(self, evt):
"""Handle synthesis cancellation"""
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
self._audio_queue.put_nowait(None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
ssml = self._construct_ssml(text)
# Start synthesis
self._speech_synthesizer.speak_ssml_async(ssml)
await self.start_tts_usage_metrics(text)
# Stream audio chunks as they arrive
while True:
chunk = await self._audio_queue.get()
if chunk is None: # End of stream
break
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(
audio=chunk,
sample_rate=self._settings["sample_rate"],
num_channels=1,
)
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
yield ErrorFrame(f"{self} error: {str(e)}")
class AzureHttpTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -292,7 +341,7 @@ class AzureTTSService(TTSService):
ssml = self._construct_ssml(text)
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml)
if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.start_tts_usage_metrics(text)

View File

@@ -14,13 +14,16 @@ from loguru import logger
from pydantic.main import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -41,29 +44,24 @@ except ModuleNotFoundError as e:
def language_to_cartesia_language(language: Language) -> str | None:
match language:
case Language.DE:
return "de"
case (
Language.EN
| Language.EN_US
| Language.EN_GB
| Language.EN_AU
| Language.EN_NZ
| Language.EN_IN
):
return "en"
case Language.ES:
return "es"
case Language.FR | Language.FR_CA:
return "fr"
case Language.JA:
return "ja"
case Language.PT | Language.PT_BR:
return "pt"
case Language.ZH | Language.ZH_TW:
return "zh"
return None
language_map = {
Language.DE: "de",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_GB: "en",
Language.EN_AU: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.JA: "ja",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.ZH: "zh",
Language.ZH_TW: "zh",
}
return language_map.get(language)
class CartesiaTTSService(WordTTSService):
@@ -114,7 +112,7 @@ class CartesiaTTSService(WordTTSService):
},
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "en",
"speed": params.speed,
"emotion": params.emotion,
}
@@ -225,14 +223,13 @@ class CartesiaTTSService(WordTTSService):
if not msg or msg["context_id"] != self._context_id:
continue
if msg["type"] == "done":
await self.push_frame(TTSStoppedFrame())
await self.stop_ttfb_metrics()
# Unset _context_id but not the _context_id_start_timestamp
# because we are likely still playing out audio and need the
# timestamp to set send context frames.
self._context_id = None
await self.add_word_timestamps(
[("TTSStoppedFrame", 0), ("LLMFullResponseEndFrame", 0)]
)
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
elif msg["type"] == "timestamps":
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
@@ -258,6 +255,19 @@ class CartesiaTTSService(WordTTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
if isinstance(frame, TTSSpeakFrame):
await self.pause_processing_frames()
elif isinstance(frame, LLMFullResponseEndFrame) and self._context_id:
await self.pause_processing_frames()
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.resume_processing_frames()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -316,7 +326,7 @@ class CartesiaHttpTTSService(TTSService):
},
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "en",
"speed": params.speed,
"emotion": params.emotion,
}

View File

@@ -13,12 +13,15 @@ from loguru import logger
from pydantic import BaseModel, model_validator
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -39,6 +42,48 @@ except ModuleNotFoundError as e:
ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]
def language_to_elevenlabs_language(language: Language) -> str | None:
language_map = {
Language.BG: "bg",
Language.ZH: "zh",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.FI: "fi",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.DE: "de",
Language.DE_CH: "de",
Language.EL: "el",
Language.HI: "hi",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.MS: "ms",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt-PT",
Language.PT_BR: "pt-BR",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.ES: "es",
Language.SV: "sv",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
}
return language_map.get(language)
def sample_rate_from_output_format(output_format: str) -> int:
match output_format:
case "pcm_16000":
@@ -132,7 +177,7 @@ class ElevenLabsTTSService(WordTTSService):
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "en",
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
@@ -155,73 +200,7 @@ class ElevenLabsTTSService(WordTTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.BG:
return "bg"
case Language.ZH:
return "zh"
case Language.CS:
return "cs"
case Language.DA:
return "da"
case Language.NL:
return "nl"
case (
Language.EN
| Language.EN_US
| Language.EN_AU
| Language.EN_GB
| Language.EN_NZ
| Language.EN_IN
):
return "en"
case Language.FI:
return "fi"
case Language.FR | Language.FR_CA:
return "fr"
case Language.DE | Language.DE_CH:
return "de"
case Language.EL:
return "el"
case Language.HI:
return "hi"
case Language.HU:
return "hu"
case Language.ID:
return "id"
case Language.IT:
return "it"
case Language.JA:
return "ja"
case Language.KO:
return "ko"
case Language.MS:
return "ms"
case Language.NO:
return "no"
case Language.PL:
return "pl"
case Language.PT:
return "pt-PT"
case Language.PT_BR:
return "pt-BR"
case Language.RO:
return "ro"
case Language.RU:
return "ru"
case Language.SK:
return "sk"
case Language.ES:
return "es"
case Language.SV:
return "sv"
case Language.TR:
return "tr"
case Language.UK:
return "uk"
case Language.VI:
return "vi"
return None
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
voice_settings = {}
@@ -283,7 +262,20 @@ class ElevenLabsTTSService(WordTTSService):
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
if isinstance(frame, TTSSpeakFrame):
await self.pause_processing_frames()
elif isinstance(frame, LLMFullResponseEndFrame) and self._started:
await self.pause_processing_frames()
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.resume_processing_frames()
async def _connect(self):
try:

View File

@@ -34,6 +34,53 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_gladia_language(language: Language) -> str | None:
language_map = {
Language.BG: "bg",
Language.CA: "ca",
Language.ZH: "zh",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ET: "et",
Language.FI: "fi",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.DE: "de",
Language.DE_CH: "de",
Language.EL: "el",
Language.HI: "hi",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.ES: "es",
Language.SV: "sv",
Language.TH: "th",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
}
return language_map.get(language)
class GladiaSTTService(STTService):
class InputParams(BaseModel):
sample_rate: Optional[int] = 16000
@@ -79,50 +126,7 @@ class GladiaSTTService(STTService):
self._confidence = confidence
def language_to_service_language(self, language: Language) -> str | None:
language_map = {
Language.BG: "bg",
Language.CA: "ca",
Language.ZH: "zh",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ET: "et",
Language.FI: "fi",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.DE: "de",
Language.DE_CH: "de",
Language.EL: "el",
Language.HI: "hi",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.ES: "es",
Language.SV: "sv",
Language.TH: "th",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
}
return language_map.get(language)
return language_to_gladia_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)

View File

@@ -16,6 +16,7 @@ from PIL import Image
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
AudioRawFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
@@ -55,6 +56,53 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_google_language(language: Language) -> str | None:
language_map = {
Language.BG: "bg-BG",
Language.CA: "ca-ES",
Language.ZH: "cmn-CN",
Language.ZH_TW: "cmn-TW",
Language.CS: "cs-CZ",
Language.DA: "da-DK",
Language.NL: "nl-NL",
Language.EN: "en-US",
Language.EN_US: "en-US",
Language.EN_AU: "en-AU",
Language.EN_GB: "en-GB",
Language.EN_IN: "en-IN",
Language.ET: "et-EE",
Language.FI: "fi-FI",
Language.NL_BE: "nl-BE",
Language.FR: "fr-FR",
Language.FR_CA: "fr-CA",
Language.DE: "de-DE",
Language.EL: "el-GR",
Language.HI: "hi-IN",
Language.HU: "hu-HU",
Language.ID: "id-ID",
Language.IT: "it-IT",
Language.JA: "ja-JP",
Language.KO: "ko-KR",
Language.LV: "lv-LV",
Language.LT: "lt-LT",
Language.MS: "ms-MY",
Language.NO: "nb-NO",
Language.PL: "pl-PL",
Language.PT: "pt-PT",
Language.PT_BR: "pt-BR",
Language.RO: "ro-RO",
Language.RU: "ru-RU",
Language.SK: "sk-SK",
Language.ES: "es-ES",
Language.SV: "sv-SE",
Language.TH: "th-TH",
Language.TR: "tr-TR",
Language.UK: "uk-UA",
Language.VI: "vi-VN",
}
return language_map.get(language)
class GoogleUserContextAggregator(OpenAIUserContextAggregator):
async def _push_aggregation(self):
if len(self._aggregation) > 0:
@@ -184,11 +232,53 @@ class GoogleLLMContext(OpenAILLMContext):
msgs.append(obj)
return msgs
def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
):
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
parts = []
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(inline_data=glm.Blob(mime_type="image/jpeg", data=buffer.getvalue())),
)
self.add_message(glm.Content(role="user", parts=parts))
def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: str = None):
if not audio_frames:
return
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
parts = []
data = b"".join(frame.audio for frame in audio_frames)
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(
inline_data=glm.Blob(
mime_type="audio/wav",
data=(
bytes(
self.create_wav_header(sample_rate, num_channels, 16, len(data)) + data
)
),
)
),
)
self.add_message(glm.Content(role="user", parts=parts))
# message = {"mime_type": "audio/mp3", "data": bytes(data + create_wav_header(sample_rate, num_channels, 16, len(data)))}
# self.add_message(message)
def from_standard_message(self, message):
role = message["role"]
content = message.get("content", [])
if role == "system":
role = "user"
self.system_message = content
return None
elif role == "assistant":
role = "model"
@@ -232,20 +322,6 @@ class GoogleLLMContext(OpenAILLMContext):
message = glm.Content(role=role, parts=parts)
return message
def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
):
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
parts = []
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(inline_data=glm.Blob(mime_type="image/jpeg", data=buffer.getvalue())),
)
self.add_message(glm.Content(role="user", parts=parts))
def to_standard_messages(self, obj) -> list:
msg = {"role": obj.role, "content": []}
if msg["role"] == "model":
@@ -289,9 +365,20 @@ class GoogleLLMContext(OpenAILLMContext):
return [msg]
def _restructure_from_openai_messages(self):
self.system_message = None
# first, map across self._messages calling self.from_standard_message(m) to modify messages in place
try:
self._messages[:] = [self.from_standard_message(m) for m in self._messages]
self._messages[:] = [
msg
for msg in (self.from_standard_message(m) for m in self._messages)
if msg is not None
]
# We might have been given a messages list with only a system message. If so, let's put that back in
# the messages list as a user message.
if self.system_message and not self._messages:
self.add_message(
glm.Content(role="user", parts=[glm.Part(text=self.system_message)])
)
except Exception as e:
logger.error(f"Error mapping messages: {e}")
# iterate over messages and remove any messages that have an empty content list
@@ -319,11 +406,14 @@ class GoogleLLMService(LLMService):
api_key: str,
model: str = "gemini-1.5-flash-latest",
params: InputParams = InputParams(),
system_instruction: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
gai.configure(api_key=api_key)
self._create_client(model)
self.set_model_name(model)
self._system_instruction = system_instruction
self._create_client()
self._settings = {
"max_tokens": params.max_tokens,
"temperature": params.temperature,
@@ -335,34 +425,10 @@ class GoogleLLMService(LLMService):
def can_generate_metrics(self) -> bool:
return True
def _create_client(self, model: str):
self.set_model_name(model)
self._client = gai.GenerativeModel(model)
def _get_messages_from_openai_context(self, context: OpenAILLMContext) -> List[glm.Content]:
openai_messages = context.get_messages()
google_messages = []
for message in openai_messages:
role = message["role"]
content = message["content"]
if role == "system":
role = "user"
elif role == "assistant":
role = "model"
parts = [glm.Part(text=content)]
if "mime_type" in message:
parts.append(
glm.Part(
inline_data=glm.Blob(
mime_type=message["mime_type"], data=message["data"].getvalue()
)
)
)
google_messages.append({"role": role, "parts": parts})
return google_messages
def _create_client(self):
self._client = gai.GenerativeModel(
self._model_name, system_instruction=self._system_instruction
)
async def _async_generator_wrapper(self, sync_generator):
for item in sync_generator:
@@ -374,10 +440,11 @@ class GoogleLLMService(LLMService):
try:
logger.debug(f"Generating chat: {context.get_messages_for_logging()}")
# todo: move this into the new context code structure, convert from openai context one time
# todo: add system instructions
# messages = self._get_messages_from_openai_context(context)
messages = context.messages
if self._system_instruction != context.system_message:
logger.debug(f"System instruction changed: {context.system_message}")
self._system_instruction = context.system_message
self._create_client()
# Filter out None values and create GenerationConfig
generation_params = {
@@ -394,24 +461,21 @@ class GoogleLLMService(LLMService):
generation_config = GenerationConfig(**generation_params) if generation_params else None
await self.start_ttfb_metrics()
tools = context.tools if context.tools else []
response = self._client.generate_content(
contents=messages, tools=tools, stream=True, generation_config=generation_config
)
tokens = LLMTokenUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
total_tokens=response.usage_metadata.total_token_count,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_ttfb_metrics()
prompt_tokens = response.usage_metadata.prompt_token_count
completion_tokens = response.usage_metadata.candidates_token_count
total_tokens = response.usage_metadata.total_token_count
async for chunk in self._async_generator_wrapper(response):
# todo: usage
if chunk.usage_metadata:
prompt_tokens += response.usage_metadata.prompt_token_count
completion_tokens += response.usage_metadata.candidates_token_count
total_tokens += response.usage_metadata.total_token_count
try:
for c in chunk.parts:
if c.text:
@@ -436,6 +500,13 @@ class GoogleLLMService(LLMService):
except Exception as e:
logger.exception(f"{self} exception: {e}")
finally:
await self.start_llm_usage_metrics(
LLMTokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)
)
await self.push_frame(LLMFullResponseEndFrame())
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -499,7 +570,7 @@ class GoogleTTSService(TTSService):
"emphasis": params.emphasis,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "en-US",
"gender": params.gender,
"google_style": params.google_style,
}
@@ -530,88 +601,7 @@ class GoogleTTSService(TTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.BG:
return "bg-BG"
case Language.CA:
return "ca-ES"
case Language.ZH:
return "cmn-CN"
case Language.ZH_TW:
return "cmn-TW"
case Language.CS:
return "cs-CZ"
case Language.DA:
return "da-DK"
case Language.NL:
return "nl-NL"
case Language.EN | Language.EN_US:
return "en-US"
case Language.EN_AU:
return "en-AU"
case Language.EN_GB:
return "en-GB"
case Language.EN_IN:
return "en-IN"
case Language.ET:
return "et-EE"
case Language.FI:
return "fi-FI"
case Language.NL_BE:
return "nl-BE"
case Language.FR:
return "fr-FR"
case Language.FR_CA:
return "fr-CA"
case Language.DE:
return "de-DE"
case Language.EL:
return "el-GR"
case Language.HI:
return "hi-IN"
case Language.HU:
return "hu-HU"
case Language.ID:
return "id-ID"
case Language.IT:
return "it-IT"
case Language.JA:
return "ja-JP"
case Language.KO:
return "ko-KR"
case Language.LV:
return "lv-LV"
case Language.LT:
return "lt-LT"
case Language.MS:
return "ms-MY"
case Language.NO:
return "nb-NO"
case Language.PL:
return "pl-PL"
case Language.PT:
return "pt-PT"
case Language.PT_BR:
return "pt-BR"
case Language.RO:
return "ro-RO"
case Language.RU:
return "ru-RU"
case Language.SK:
return "sk-SK"
case Language.ES:
return "es-ES"
case Language.SV:
return "sv-SE"
case Language.TH:
return "th-TH"
case Language.TR:
return "tr-TR"
case Language.UK:
return "uk-UA"
case Language.VI:
return "vi-VN"
return None
return language_to_google_language(language)
def _construct_ssml(self, text: str) -> str:
ssml = "<speak>"

View File

@@ -35,6 +35,27 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_lmnt_language(language: Language) -> str | None:
language_map = {
Language.DE: "de",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.ZH: "zh",
Language.ZH_TW: "zh",
Language.KO: "ko",
}
return language_map.get(language)
class LmntTTSService(TTSService):
def __init__(
self,
@@ -72,29 +93,7 @@ class LmntTTSService(TTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.DE:
return "de"
case (
Language.EN
| Language.EN_US
| Language.EN_AU
| Language.EN_GB
| Language.EN_NZ
| Language.EN_IN
):
return "en"
case Language.ES:
return "es"
case Language.FR | Language.FR_CA:
return "fr"
case Language.PT | Language.PT_BR:
return "pt"
case Language.ZH | Language.ZH_TW:
return "zh"
case Language.KO:
return "ko"
return None
return language_to_lmnt_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)

View File

@@ -128,7 +128,9 @@ class OpenAIRealtimeBetaLLMService(LLMService):
#
async def _handle_interruption(self):
if self._session_properties.turn_detection is None:
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
if self._session_properties.turn_detection is False:
await self.send_client_event(events.InputAudioBufferClearEvent())
await self.send_client_event(events.ResponseCancelEvent())
await self._truncate_current_audio_response()
@@ -138,11 +140,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self.push_frame(TTSStoppedFrame())
async def _handle_user_started_speaking(self, frame):
if self._session_properties.turn_detection is None:
await self._handle_interruption()
pass
async def _handle_user_stopped_speaking(self, frame):
if self._session_properties.turn_detection is None:
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
if self._session_properties.turn_detection is False:
await self.send_client_event(events.InputAudioBufferCommitEvent())
await self.send_client_event(events.ResponseCreateEvent())
@@ -438,8 +441,6 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_speech_started(self, evt):
await self._truncate_current_audio_response()
# todo: might need to guard sending these when we fully support using either openai
# turn detection of Pipecat turn detection
await self._start_interruption() # cancels this processor task
await self.push_frame(StartInterruptionFrame()) # cancels downstream tasks
await self.push_frame(UserStartedSpeakingFrame())

View File

@@ -17,13 +17,16 @@ from loguru import logger
from pydantic.main import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -44,63 +47,40 @@ except ModuleNotFoundError as e:
def language_to_playht_language(language: Language) -> str | None:
match language:
case Language.BG:
return "BULGARIAN"
case Language.CA:
return "CATALAN"
case Language.CS:
return "CZECH"
case Language.DA:
return "DANISH"
case Language.DE:
return "GERMAN"
case (
Language.EN
| Language.EN_US
| Language.EN_GB
| Language.EN_AU
| Language.EN_NZ
| Language.EN_IN
):
return "ENGLISH"
case Language.ES:
return "SPANISH"
case Language.FR | Language.FR_CA:
return "FRENCH"
case Language.EL:
return "GREEK"
case Language.HI:
return "HINDI"
case Language.HU:
return "HUNGARIAN"
case Language.ID:
return "INDONESIAN"
case Language.IT:
return "ITALIAN"
case Language.JA:
return "JAPANESE"
case Language.KO:
return "KOREAN"
case Language.MS:
return "MALAY"
case Language.NL:
return "DUTCH"
case Language.PL:
return "POLISH"
case Language.PT | Language.PT_BR:
return "PORTUGUESE"
case Language.RU:
return "RUSSIAN"
case Language.SV:
return "SWEDISH"
case Language.TH:
return "THAI"
case Language.TR:
return "TURKISH"
case Language.UK:
return "UKRAINIAN"
return None
language_map = {
Language.BG: "bulgarian",
Language.CA: "catalan",
Language.CS: "czech",
Language.DA: "danish",
Language.DE: "german",
Language.EN: "english",
Language.EN_US: "english",
Language.EN_GB: "english",
Language.EN_AU: "english",
Language.EN_NZ: "english",
Language.EN_IN: "english",
Language.ES: "spanish",
Language.FR: "french",
Language.FR_CA: "french",
Language.EL: "greek",
Language.HI: "hindi",
Language.HU: "hungarian",
Language.ID: "indonesian",
Language.IT: "italian",
Language.JA: "japanese",
Language.KO: "korean",
Language.MS: "malay",
Language.NL: "dutch",
Language.PL: "polish",
Language.PT: "portuguese",
Language.PT_BR: "portuguese",
Language.RU: "russian",
Language.SV: "swedish",
Language.TH: "thai",
Language.TR: "turkish",
Language.UK: "ukrainian",
}
return language_map.get(language)
class PlayHTTTSService(TTSService):
@@ -115,13 +95,16 @@ class PlayHTTTSService(TTSService):
api_key: str,
user_id: str,
voice_url: str,
voice_engine: str = "PlayHT3.0-mini",
voice_engine: str = "Play3.0-mini",
sample_rate: int = 24000,
output_format: str = "wav",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
super().__init__(
sample_rate=sample_rate,
**kwargs,
)
self._api_key = api_key
self._user_id = user_id
@@ -134,7 +117,7 @@ class PlayHTTTSService(TTSService):
"sample_rate": sample_rate,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "english",
"output_format": output_format,
"voice_engine": voice_engine,
"speed": params.speed,
@@ -147,8 +130,7 @@ class PlayHTTTSService(TTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
# Keep your existing language mapping logic here
pass
return language_to_playht_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)
@@ -228,17 +210,11 @@ class PlayHTTTSService(TTSService):
async def _receive_task_handler(self):
try:
header_size = 78 # Size of the WAV header + extra bytes we want to skip
header_received = False
async for message in self._get_websocket():
if isinstance(message, bytes):
chunk_size = len(message)
# Skip the WAV header
if not header_received and chunk_size == header_size:
header_received = True
# Skip the WAV header message
if message.startswith(b"RIFF"):
continue
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(message, self._settings["sample_rate"], 1)
await self.push_frame(frame)
@@ -248,7 +224,6 @@ class PlayHTTTSService(TTSService):
msg = json.loads(message)
if "request_id" in msg and msg["request_id"] == self._request_id:
await self.push_frame(TTSStoppedFrame())
header_received = False # Reset for the next audio stream
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
@@ -260,6 +235,19 @@ class PlayHTTTSService(TTSService):
except Exception as e:
logger.error(f"{self} exception in receive task: {e}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
if isinstance(frame, TTSSpeakFrame):
await self.pause_processing_frames()
elif isinstance(frame, LLMFullResponseEndFrame) and self._request_id:
await self.pause_processing_frames()
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.resume_processing_frames()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -315,7 +303,7 @@ class PlayHTHttpTTSService(TTSService):
api_key: str,
user_id: str,
voice_url: str,
voice_engine: str = "PlayHT3.0-mini",
voice_engine: str = "Play3.0-mini",
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
@@ -333,7 +321,7 @@ class PlayHTHttpTTSService(TTSService):
"sample_rate": sample_rate,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
else "english",
"format": Format.FORMAT_WAV,
"voice_engine": voice_engine,
"speed": params.speed,

View File

@@ -0,0 +1,101 @@
from typing import AsyncGenerator, Optional
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.ai_services import TTSService
class RimeHttpTTSService(TTSService):
class InputParams(BaseModel):
pause_between_brackets: Optional[bool] = False
phonemize_between_brackets: Optional[bool] = False
inline_speed_alpha: Optional[str] = None
speed_alpha: Optional[float] = 1.0
reduce_latency: Optional[bool] = False
def __init__(
self,
*,
api_key: str,
voice_id: str = "eva",
model: str = "mist",
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = "https://users.rime.ai/v1/rime-tts"
self._settings = {
"speaker": voice_id,
"modelId": model,
"samplingRate": sample_rate,
"speedAlpha": params.speed_alpha,
"reduceLatency": params.reduce_latency,
"pauseBetweenBrackets": params.pause_between_brackets,
"phonemizeBetweenBrackets": params.phonemize_between_brackets,
}
if params.inline_speed_alpha:
self._settings["inlineSpeedAlpha"] = params.inline_speed_alpha
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
headers = {
"Accept": "audio/pcm",
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
payload = self._settings.copy()
payload["text"] = text
try:
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async with aiohttp.ClientSession() as session:
async with session.post(self._base_url, json=payload, headers=headers) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
# Process the streaming response
chunk_size = 8192
first_chunk = True
async for chunk in response.content.iter_chunked(chunk_size):
if first_chunk:
await self.stop_ttfb_metrics()
first_chunk = False
if chunk:
frame = TTSAudioRawFrame(chunk, self._settings["samplingRate"], 1)
yield frame
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
finally:
yield TTSStoppedFrame()

View File

@@ -31,6 +31,34 @@ from loguru import logger
# https://github.com/coqui-ai/xtts-streaming-server
def language_to_xtts_language(language: Language) -> str | None:
language_map = {
Language.CS: "cs",
Language.DE: "de",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ES: "es",
Language.FR: "fr",
Language.HI: "hi",
Language.HU: "hu",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.NL: "nl",
Language.PL: "pl",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.RU: "ru",
Language.TR: "tr",
Language.ZH: "zh-cn",
}
return language_map.get(language)
class XTTSService(TTSService):
def __init__(
self,
@@ -56,47 +84,7 @@ class XTTSService(TTSService):
return True
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.CS:
return "cs"
case Language.DE:
return "de"
case (
Language.EN
| Language.EN_US
| Language.EN_AU
| Language.EN_GB
| Language.EN_NZ
| Language.EN_IN
):
return "en"
case Language.ES:
return "es"
case Language.FR:
return "fr"
case Language.HI:
return "hi"
case Language.HU:
return "hu"
case Language.IT:
return "it"
case Language.JA:
return "ja"
case Language.KO:
return "ko"
case Language.NL:
return "nl"
case Language.PL:
return "pl"
case Language.PT | Language.PT_BR:
return "pt"
case Language.RU:
return "ru"
case Language.TR:
return "tr"
case Language.ZH:
return "zh-cn"
return None
return language_to_xtts_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)

View File

@@ -14,6 +14,7 @@ from pipecat.frames.frames import (
BotInterruptionFrame,
CancelFrame,
EndFrame,
FilterUpdateSettingsFrame,
Frame,
InputAudioRawFrame,
StartFrame,
@@ -41,6 +42,9 @@ class BaseInputTransport(FrameProcessor):
self._audio_task = None
async def start(self, frame: StartFrame):
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._params.audio_in_sample_rate)
# Create audio input queue and task if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = asyncio.Queue()
@@ -52,6 +56,9 @@ class BaseInputTransport(FrameProcessor):
self._audio_task.cancel()
await self._audio_task
self._audio_task = None
# Stop audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.stop()
async def cancel(self, frame: CancelFrame):
# Cancel and wait for the audio input task to finish.
@@ -100,6 +107,8 @@ class BaseInputTransport(FrameProcessor):
vad_analyzer = self.vad_analyzer()
if vad_analyzer:
vad_analyzer.set_params(frame.params)
elif isinstance(frame, FilterUpdateSettingsFrame) and self._params.audio_in_filter:
await self._params.audio_in_filter.process_frame(frame)
# Other frames
else:
await self.push_frame(frame, direction)
@@ -165,6 +174,10 @@ class BaseInputTransport(FrameProcessor):
audio_passthrough = True
# If an audio filter is available, run it before VAD.
if self._params.audio_in_filter:
frame.audio = await self._params.audio_in_filter.filter(frame.audio)
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
if self._params.vad_enabled:

View File

@@ -8,19 +8,21 @@ import asyncio
import itertools
import sys
import time
from typing import List
from typing import AsyncGenerator, List
from loguru import logger
from PIL import Image
from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
from pipecat.frames.frames import (
AudioRawFrame,
BotSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
Frame,
MixerControlFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
SpriteFrame,
@@ -28,6 +30,7 @@ from pipecat.frames.frames import (
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TTSAudioRawFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
@@ -72,11 +75,17 @@ class BaseOutputTransport(FrameProcessor):
self._bot_speaking = False
async def start(self, frame: StartFrame):
# Start audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.start(self._params.audio_out_sample_rate)
self._create_output_tasks()
self._create_sink_tasks()
async def stop(self, frame: EndFrame):
await self._cancel_output_tasks()
# Stop audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.stop()
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
@@ -128,6 +137,8 @@ class BaseOutputTransport(FrameProcessor):
await self.stop(frame)
# We finally push EndFrame down so PipelineTask stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
await self._params.audio_out_mixer.process_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_audio(frame)
@@ -174,9 +185,10 @@ class BaseOutputTransport(FrameProcessor):
if self._params.audio_out_is_live:
await self._audio_out_queue.put(frame)
else:
cls = type(frame)
self._audio_buffer.extend(frame.audio)
while len(self._audio_buffer) >= self._audio_chunk_size:
chunk = OutputAudioRawFrame(
chunk = cls(
bytes(self._audio_buffer[: self._audio_chunk_size]),
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
@@ -196,12 +208,14 @@ class BaseOutputTransport(FrameProcessor):
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug("Bot started speaking")
await self.push_frame(BotStartedSpeakingFrame())
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = True
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug("Bot stopped speaking")
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
@@ -311,7 +325,7 @@ class BaseOutputTransport(FrameProcessor):
#
async def send_image(self, frame: OutputImageRawFrame | SpriteFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
@@ -382,7 +396,51 @@ class BaseOutputTransport(FrameProcessor):
#
async def send_audio(self, frame: OutputAudioRawFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
def _next_audio_frame(self) -> AsyncGenerator[AudioRawFrame, None]:
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]:
while True:
try:
frame = await asyncio.wait_for(
self._audio_out_queue.get(), timeout=vad_stop_secs
)
yield frame
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]:
last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size
while True:
try:
frame = self._audio_out_queue.get_nowait()
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._params.audio_out_mixer.mix(silence),
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame
vad_stop_secs = (
self._params.vad_analyzer.params.stop_secs
if self._params.vad_analyzer
else VAD_STOP_SECS
)
if self._params.audio_out_mixer:
return with_mixer(vad_stop_secs)
else:
return without_mixer(vad_stop_secs)
async def _audio_out_task_handler(self):
wait_time = (
@@ -390,27 +448,21 @@ class BaseOutputTransport(FrameProcessor):
if self._params.vad_analyzer
else VAD_STOP_SECS
)
while True:
try:
# If we don't have an audio frame for VAD stop secs we will
# consider the bot is not speaking.
frame = await asyncio.wait_for(self._audio_out_queue.get(), timeout=wait_time)
try:
async for frame in self._next_audio_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_started_speaking()
await self.push_frame(BotSpeakingFrame())
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
# Notify the bot started speaking upstream if necessary.
await self._bot_started_speaking()
# Also, push frame downstream in case anyone else needs it.
await self.push_frame(frame)
# Send audio.
await self.write_raw_audio_frames(frame.audio)
# Notify the bot is speaking upstream.
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
# Push frame downstream in case anyone else needs it.
await self.push_frame(frame)
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error writing to camera: {e}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} error writing to microphone: {e}")

View File

@@ -8,10 +8,13 @@ import asyncio
import inspect
from abc import ABC, abstractmethod
from typing import Optional
from pydantic import ConfigDict
from pydantic.main import BaseModel
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.processors.frame_processor import FrameProcessor
@@ -33,9 +36,11 @@ class TransportParams(BaseModel):
audio_out_sample_rate: int = 24000
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_out_mixer: Optional[BaseAudioMixer] = None
audio_in_enabled: bool = False
audio_in_sample_rate: int = 16000
audio_in_channels: int = 1
audio_in_filter: Optional[BaseAudioFilter] = None
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: VADAnalyzer | None = None

View File

@@ -7,6 +7,7 @@
import asyncio
import io
import time
import wave
from typing import Awaitable, Callable
@@ -42,7 +43,6 @@ except ModuleNotFoundError as e:
class FastAPIWebsocketParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer
@@ -105,44 +105,52 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
self._websocket = websocket
self._params = params
self._websocket_audio_buffer = bytes()
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
self._next_send_time = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
await self._write_frame(frame)
self._next_send_time = 0
async def write_raw_audio_frames(self, frames: bytes):
self._websocket_audio_buffer += frames
while len(self._websocket_audio_buffer):
frame = AudioRawFrame(
audio=self._websocket_audio_buffer[: self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels,
frame = AudioRawFrame(
audio=frames,
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels,
)
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(), sample_rate=frame.sample_rate, num_channels=frame.num_channels
)
frame = wav_frame
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(), sample_rate=frame.sample_rate, num_channels=frame.num_channels
)
frame = wav_frame
payload = self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)
payload = self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)
# Simulate a clock.
current_time = time.monotonic()
sleep_duration = max(0, self._next_send_time - current_time)
await asyncio.sleep(sleep_duration)
if sleep_duration == 0:
self._next_send_time = time.monotonic() + self._send_interval
else:
self._next_send_time += self._send_interval
self._websocket_audio_buffer = self._websocket_audio_buffer[
self._params.audio_frame_size :
]
self._websocket_audio_buffer = bytes()
async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)

View File

@@ -6,6 +6,7 @@
import asyncio
import io
import time
import wave
from typing import Awaitable, Callable
@@ -15,9 +16,12 @@ from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.base_input import BaseInputTransport
@@ -36,7 +40,6 @@ except ModuleNotFoundError as e:
class WebsocketServerParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = ProtobufFrameSerializer()
@@ -132,45 +135,59 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
self._websocket_audio_buffer = bytes()
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
self._next_send_time = 0
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol | None):
if self._websocket:
await self._websocket.close()
logger.warning("Only one client allowed, using new connection")
self._websocket = websocket
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._next_send_time = 0
async def write_raw_audio_frames(self, frames: bytes):
if not self._websocket:
return
self._websocket_audio_buffer += frames
while len(self._websocket_audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
audio=self._websocket_audio_buffer[: self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels,
frame = AudioRawFrame(
audio=frames,
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels,
)
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(), sample_rate=frame.sample_rate, num_channels=frame.num_channels
)
frame = wav_frame
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(), sample_rate=frame.sample_rate, num_channels=frame.num_channels
)
frame = wav_frame
proto = self._params.serializer.serialize(frame)
if proto:
await self._websocket.send(proto)
proto = self._params.serializer.serialize(frame)
if proto:
await self._websocket.send(proto)
# Simulate a clock.
current_time = time.monotonic()
sleep_duration = max(0, self._next_send_time - current_time)
await asyncio.sleep(sleep_duration)
if sleep_duration == 0:
self._next_send_time = time.monotonic() + self._send_interval
else:
self._next_send_time += self._send_interval
self._websocket_audio_buffer = self._websocket_audio_buffer[
self._params.audio_frame_size :
]
self._websocket_audio_buffer = bytes()
class WebsocketServerTransport(BaseTransport):

View File

@@ -128,7 +128,11 @@ class DailyCallbacks(BaseModel):
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
on_dialin_connected: Callable[[Any], Awaitable[None]]
on_dialin_ready: Callable[[str], Awaitable[None]]
on_dialin_stopped: Callable[[Any], Awaitable[None]]
on_dialin_error: Callable[[Any], Awaitable[None]]
on_dialin_warning: Callable[[Any], Awaitable[None]]
on_dialout_answered: Callable[[Any], Awaitable[None]]
on_dialout_connected: Callable[[Any], Awaitable[None]]
on_dialout_stopped: Callable[[Any], Awaitable[None]]
@@ -139,6 +143,9 @@ class DailyCallbacks(BaseModel):
on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]]
on_participant_updated: Callable[[Mapping[str, Any]], Awaitable[None]]
on_transcription_message: Callable[[Mapping[str, Any]], Awaitable[None]]
on_recording_started: Callable[[Mapping[str, Any]], Awaitable[None]]
on_recording_stopped: Callable[[str], Awaitable[None]]
on_recording_error: Callable[[str, str], Awaitable[None]]
def completion_callback(future):
@@ -494,7 +501,7 @@ class DailyTransportClient(EventHandler):
):
# Only enable camera subscription on this participant
await self.update_subscriptions(
participant_settings={participant_id: {"media": "subscribed"}}
participant_settings={participant_id: {"media": {"camera": "subscribed"}}}
)
self._video_renderers[participant_id] = callback
@@ -533,9 +540,21 @@ class DailyTransportClient(EventHandler):
def on_call_state_updated(self, state: str):
self._call_async_callback(self._callbacks.on_call_state_updated, state)
def on_dialin_connected(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_connected, data)
def on_dialin_ready(self, sip_endpoint: str):
self._call_async_callback(self._callbacks.on_dialin_ready, sip_endpoint)
def on_dialin_stopped(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_stopped, data)
def on_dialin_error(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_error, data)
def on_dialin_warning(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_warning, data)
def on_dialout_answered(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_answered, data)
@@ -584,6 +603,18 @@ class DailyTransportClient(EventHandler):
def on_transcription_message(self, message):
self._call_async_callback(self._callbacks.on_transcription_message, message)
def on_recording_started(self, status):
logger.debug(f"Recording started: {status}")
self._call_async_callback(self._callbacks.on_recording_started, status)
def on_recording_stopped(self, stream_id):
logger.debug(f"Recording stopped: {stream_id}")
self._call_async_callback(self._callbacks.on_recording_stopped, stream_id)
def on_recording_error(self, stream_id, message):
logger.error(f"Recording error for {stream_id}: {message}")
self._call_async_callback(self._callbacks.on_recording_error, stream_id, message)
#
# Daily (CallClient callbacks)
#
@@ -807,7 +838,11 @@ class DailyTransport(BaseTransport):
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
on_dialin_connected=self._on_dialin_connected,
on_dialin_ready=self._on_dialin_ready,
on_dialin_stopped=self._on_dialin_stopped,
on_dialin_error=self._on_dialin_error,
on_dialin_warning=self._on_dialin_warning,
on_dialout_answered=self._on_dialout_answered,
on_dialout_connected=self._on_dialout_connected,
on_dialout_stopped=self._on_dialout_stopped,
@@ -818,6 +853,9 @@ class DailyTransport(BaseTransport):
on_participant_left=self._on_participant_left,
on_participant_updated=self._on_participant_updated,
on_transcription_message=self._on_transcription_message,
on_recording_started=self._on_recording_started,
on_recording_stopped=self._on_recording_stopped,
on_recording_error=self._on_recording_error,
)
self._params = params
@@ -833,7 +871,11 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_left")
self._register_event_handler("on_app_message")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_dialin_connected")
self._register_event_handler("on_dialin_ready")
self._register_event_handler("on_dialin_stopped")
self._register_event_handler("on_dialin_error")
self._register_event_handler("on_dialin_warning")
self._register_event_handler("on_dialout_answered")
self._register_event_handler("on_dialout_connected")
self._register_event_handler("on_dialout_stopped")
@@ -843,6 +885,10 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_participant_updated")
self._register_event_handler("on_transcription_message")
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
#
# BaseTransport
@@ -868,11 +914,11 @@ class DailyTransport(BaseTransport):
async def send_image(self, frame: OutputImageRawFrame | SpriteFrame):
if self._output:
await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._output.queue_frame(frame, FrameDirection.DOWNSTREAM)
async def send_audio(self, frame: OutputAudioRawFrame):
if self._output:
await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._output.queue_frame(frame, FrameDirection.DOWNSTREAM)
def participants(self):
return self._client.participants()
@@ -965,11 +1011,23 @@ class DailyTransport(BaseTransport):
except Exception as e:
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
async def _on_dialin_connected(self, data):
await self._call_event_handler("on_dialin_connected", data)
async def _on_dialin_ready(self, sip_endpoint):
if self._params.dialin_settings:
await self._handle_dialin_ready(sip_endpoint)
await self._call_event_handler("on_dialin_ready", sip_endpoint)
async def _on_dialin_stopped(self, data):
await self._call_event_handler("on_dialin_stopped", data)
async def _on_dialin_error(self, data):
await self._call_event_handler("on_dialin_error", data)
async def _on_dialin_warning(self, data):
await self._call_event_handler("on_dialin_warning", data)
async def _on_dialout_answered(self, data):
await self._call_event_handler("on_dialout_answered", data)
@@ -998,6 +1056,8 @@ class DailyTransport(BaseTransport):
await self._call_event_handler("on_first_participant_joined", participant)
async def _on_transcription_message(self, message):
await self._call_event_handler("on_transcription_message", message)
participant_id = ""
if "participantId" in message:
participant_id = message["participantId"]
@@ -1020,3 +1080,12 @@ class DailyTransport(BaseTransport):
if self._input:
await self._input.push_transcription_frame(frame)
async def _on_recording_started(self, status):
await self._call_event_handler("on_recording_started", status)
async def _on_recording_stopped(self, stream_id):
await self._call_event_handler("on_recording_stopped", stream_id)
async def _on_recording_error(self, stream_id, message):
await self._call_event_handler("on_recording_error", stream_id, message)

View File

@@ -495,7 +495,7 @@ class LiveKitTransport(BaseTransport):
async def send_audio(self, frame: OutputAudioRawFrame):
if self._output:
await self._output.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._output.queue_frame(frame, FrameDirection.DOWNSTREAM)
def get_participants(self) -> List[str]:
return self._client.get_participants()