Compare commits

...

96 Commits

Author SHA1 Message Date
Nikita Gamolsky
0265c1d3ef alllow interrupt 2024-11-02 16:12:29 -07:00
Nikita Gamolsky
ffa0e5a122 working with summary 2024-11-02 15:33:03 -07:00
Nikita Gamolsky
cdeab597b3 more variables 2024-11-02 14:05:19 -07:00
Nikita Gamolsky
abd486025b more updates 2024-11-02 13:46:28 -07:00
Nikita Gamolsky
c4cdb2d809 update to use context global 2024-11-02 13:37:35 -07:00
Nikita Gamolsky
05ba10c969 update 2024-11-02 13:27:08 -07:00
Kwindla Hultman Kramer
2f80683dc4 initial commit of screen capture in 99-anthropic-hackathon.py 2024-11-02 10:42:31 -07:00
Kwindla Hultman Kramer
151242d3a0 Merge pull request #666 from pipecat-ai/khk/realtime-pipecat-vad
Support using Pipecat turn detection instead of OpenAI Realtime API turn detection
2024-11-02 08:36:31 -07:00
Kwindla Hultman Kramer
93c6e5098c added comment explaining config of TurnDetection 2024-11-02 08:24:54 -07:00
Mark Backman
84bd767312 Merge pull request #685 from pipecat-ai/mb/add-recording-events
Add recording events and callbacks
2024-11-01 12:02:46 -04:00
Mark Backman
802c29e9e1 Add recording events and callbacks 2024-11-01 10:20:00 -04:00
Aleix Conchillo Flaqué
f83381860c Merge pull request #677 from pipecat-ai/aleix/add-notifier-and-notifier-filters
add notifiers and more frame filters
2024-10-31 15:55:07 -07:00
Aleix Conchillo Flaqué
4dad1bfe49 examples: add foundational/22-natural-conversation.py 2024-10-31 12:10:33 -07:00
marcus-daily
9ee8896b64 Removing unnecessary ruff arguments from README 2024-10-31 18:02:29 +00:00
marcus-daily
5f7a2f66d4 Add .idea to .gitignore 2024-10-31 18:02:29 +00:00
marcus-daily
76e5f1e847 Remove unnecessary ruff params in CI 2024-10-31 15:07:28 +00:00
marcus-daily
6975340d6c Set Ruff config for the project 2024-10-31 15:07:28 +00:00
marcus-daily
0f4cf56418 Load dotenv in simple chatbot server (fixes #415) 2024-10-31 12:08:30 +00:00
Aleix Conchillo Flaqué
018e51e8a3 add notifiers and more frame filters 2024-10-30 16:36:17 -07:00
Vanessa Pyne
b050143952 Merge pull request #676 from RonakAgarwalVani/fix/chunk-choices-delta-none
Fix uncaught exception when accessing 'tool_calls' in NoneType delta in response handling
2024-10-30 14:44:32 -05:00
Mark Backman
98ea1f0791 Merge pull request #675 from pipecat-ai/mb/playht-add-request-id
Add a request_id to each TTS sequence
2024-10-30 13:56:15 -04:00
Mark Backman
8272c35527 Use a request_id in TTS commands for the PlayHT websocket service 2024-10-30 13:54:18 -04:00
Mark Backman
e973e82e05 Merge pull request #672 from pipecat-ai/mb/fix-playht
Fix PlayHT TTFB metrics
2024-10-30 13:53:02 -04:00
RonakAgarwalVani
d1396bf618 Update openai.py 2024-10-30 14:26:49 +05:30
Vanessa Pyne
8186e423de Merge pull request #637 from pipecat-ai/vp-issue-template
docs: add ISSUE_TEMPLATE.md
2024-10-29 15:08:42 -05:00
vipyne
3010addb8b docs: add CONTRIBUTING.md 2024-10-29 15:03:07 -05:00
vipyne
029e0d391e docs: add ISSUE_TEMPLATE.md 2024-10-29 15:03:07 -05:00
Vanessa Pyne
bf31223577 Merge pull request #671 from pipecat-ai/vp-issue-635
docs: small fix
2024-10-29 14:34:13 -05:00
vipyne
42cc79154f docs: small fix 2024-10-29 14:33:57 -05:00
Mark Backman
05b857006a Update changelog 2024-10-28 20:56:29 -04:00
Mark Backman
2e57d21b89 Fix ttfb metrics 2024-10-28 20:27:24 -04:00
Aleix Conchillo Flaqué
fa05ec46be Merge pull request #667 from pipecat-ai/aleix/base-output-bot-speaking-detection
transports(base_output): use audio frames for bot speaking detection
2024-10-28 10:54:54 -07:00
Aleix Conchillo Flaqué
e3ce619284 transports(base_output): use audio frames for bot speaking detection 2024-10-28 10:07:37 -07:00
Vanessa Pyne
fb512dcd74 Merge pull request #630 from MoofSoup/update-readme
docs: simplify readme
2024-10-28 10:26:30 -05:00
Aleix Conchillo Flaqué
ca15d97383 Merge pull request #662 from pipecat-ai/aleix/daily-transport-async-functions
transports(daily): make functions async
2024-10-25 16:14:06 -07:00
Aleix Conchillo Flaqué
b32448e967 transports(daily): make functions async 2024-10-25 15:01:52 -07:00
Aleix Conchillo Flaqué
7e30da6183 Merge pull request #661 from pipecat-ai/aleix/allow-updating-subscritption-before
transports(daily): allow updating subscriptions before join
2024-10-25 15:00:34 -07:00
Aleix Conchillo Flaqué
a6dd2600d2 examples(tavus): await update_subscriptions 2024-10-25 14:56:56 -07:00
Aleix Conchillo Flaqué
b905b57dfc transports(daily): allow updating subscriptions before join 2024-10-25 14:46:17 -07:00
Kwindla Hultman Kramer
e1a7edfb58 make it possible to use Pipecat turn detection instead of OpenAI turn detection 2024-10-25 15:59:48 -05:00
Aleix Conchillo Flaqué
1b30b1fc23 Merge pull request #665 from pipecat-ai/aleix/fix-bot-started-stopped-speaking
transports(base_output): fix constant bot started/stopped speaking fr…
2024-10-25 13:00:38 -07:00
Aleix Conchillo Flaqué
55026898f6 transports(base_output): use vad stop secs for bot stopped speaking 2024-10-25 12:59:15 -07:00
Aleix Conchillo Flaqué
4283557894 audio(vad): expose params property 2024-10-25 12:59:15 -07:00
Aleix Conchillo Flaqué
5ab00e01aa transports(base_output): fix constant bot started/stopped speaking frames 2024-10-25 12:10:24 -07:00
Aleix Conchillo Flaqué
fcfc729e83 Merge pull request #664 from pipecat-ai/aleix/fix-aws-stuttering
services(aws): read stream and resample in a thread
2024-10-25 11:49:28 -07:00
Aleix Conchillo Flaqué
4eacb34fd8 services(aws): read stream and resample in a thread 2024-10-25 11:22:28 -07:00
Aleix Conchillo Flaqué
3a8aacccf7 Merge pull request #663 from pipecat-ai/aleix/audio-resampling-with-resampy
audio: use resamply for audio resampling
2024-10-25 10:16:20 -07:00
roey
54c0bf0c70 Adding TavusVideoService layer (#617)
Co-authored-by: roey <159067767+roey-tavus@users.noreply.github.com>
Co-authored-by: Mert Gerdan <mert@tavus.io>
Co-authored-by: Aleix Conchillo Flaqué <aleix@daily.co>
2024-10-25 09:46:25 -07:00
Aleix Conchillo Flaqué
778b05a252 audio: use resamply for audio resampling 2024-10-25 09:22:22 -07:00
Mark Backman
f16a416c2b Merge pull request #660 from pipecat-ai/mb/add-gemini-inputs
Add input params to Google Gemini
2024-10-24 20:58:19 -04:00
Aleix Conchillo Flaqué
1be63bccb8 Merge pull request #647 from pipecat-ai/aleix/daily-transport-only-transcribe-users
transport(daily): only transcribe users
2024-10-24 17:40:34 -07:00
Mark Backman
37820ac0df Add input params to Google Gemini 2024-10-24 20:12:41 -04:00
Aleix Conchillo Flaqué
8ea80d43f4 transports(daily): only transcribe user audio 2024-10-24 17:06:43 -07:00
Aleix Conchillo Flaqué
e117d70a00 update to daily-python 0.12.0 2024-10-24 16:49:19 -07:00
Aleix Conchillo Flaqué
2ba753272a Merge pull request #658 from pipecat-ai/aleix/default-to-24000-sample-rate
update TTS and transport output sample rate to 24000
2024-10-24 16:48:41 -07:00
Aleix Conchillo Flaqué
60c8c2f6e9 examples(15a): use daily transcription instead of local whisper 2024-10-24 16:47:41 -07:00
Aleix Conchillo Flaqué
cfb48200c2 services(azure): support sample rates 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
6d317c6e8e audio: don't resample if same sample rate 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
158d52856f transports(livekit): fix VADAnalyzer import 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
92a69e404f update TTS and transport output sample rate to 24000 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
d24c6185d8 Merge pull request #654 from pipecat-ai/aleix/daily-allow-completion-futures
transport(daily): allow completion futures
2024-10-24 14:28:53 -07:00
Mark Backman
1fd21578a6 Merge pull request #657 from pipecat-ai/mb/add-elevenlabs-output-format-type
Add ElevenLabs output format type
2024-10-24 17:07:04 -04:00
Mark Backman
700db87127 Merge pull request #656 from pipecat-ai/mb/add-gemini-metrics
Add Gemini token usage metrics
2024-10-24 17:04:56 -04:00
Mark Backman
6f1310569c Add ElevenLabs output format type 2024-10-24 17:03:45 -04:00
Aleix Conchillo Flaqué
14cedb0be8 Merge pull request #655 from pipecat-ai/aleix/fix-together-params
services(together): fix together AI InputParams
2024-10-24 13:51:38 -07:00
Mark Backman
fae97f9051 Add Gemini token usage metrics 2024-10-24 16:37:21 -04:00
Aleix Conchillo Flaqué
d930a46e64 services(together): fix together AI InputParams 2024-10-24 13:08:35 -07:00
Aleix Conchillo Flaqué
2e6b5d1843 transports(daily): fix aiohttp timeout 2024-10-24 11:44:30 -07:00
Aleix Conchillo Flaqué
88362db034 transports(daily): no more need for an output message queue 2024-10-24 11:44:30 -07:00
Aleix Conchillo Flaqué
f7f0c44c32 transports(daily): don't block event handlers 2024-10-24 11:44:30 -07:00
Mark Backman
33553b71d4 Merge pull request #653 from pipecat-ai/mb/align-tts-constructors
Align TTSService constructors
2024-10-24 13:52:43 -04:00
Mark Backman
be8ca505cd Merge pull request #652 from pipecat-ai/khk/more-gemini
Gemini new context manager and rewrite to use google data structures internally
2024-10-24 13:47:38 -04:00
Mark Backman
e957cce422 Align TTSService constructors 2024-10-24 13:42:06 -04:00
Mark Backman
418a13a4ec Merge pull request #650 from pipecat-ai/mb/assembly-fix
AssemblyAI: don't disconnect on language change
2024-10-24 11:26:56 -04:00
Mark Backman
fc445c0a1f Merge pull request #649 from pipecat-ai/mb/open-ai-max-tokens
Add max_tokens and max_completion_tokens inputs for OpenAI
2024-10-24 11:26:44 -04:00
Mark Backman
f0c65468ed AssemblyAI: don't disconnect on language change 2024-10-24 08:30:48 -04:00
Mark Backman
ce6a2bdcf7 Add max tokens inputs to OpenAI 2024-10-24 07:03:45 -04:00
Mark Backman
673542e235 Merge pull request #646 from pipecat-ai/mb/grok-function-calling
Support function calling for Grok
2024-10-23 21:56:38 -04:00
Kwindla Hultman Kramer
e032b0b70a gemini context aggregators 2024-10-23 18:44:09 -07:00
Mark Backman
e39f7e965b Support function calling for Grok 2024-10-23 17:22:26 -04:00
Mattie Ruth
d26751e968 add missing PipelineParams to enable the metrics (#645) 2024-10-23 16:46:46 -04:00
Aleix Conchillo Flaqué
e0ca4a9c23 Merge pull request #643 from pipecat-ai/aleix/daily-update-subscriptions
transports(daily): add update_subscriptions()
2024-10-22 17:07:07 -07:00
Aleix Conchillo Flaqué
801e52c095 transports(daily): add update_subscriptions() 2024-10-22 15:02:55 -07:00
Aleix Conchillo Flaqué
a46eaa838b Merge pull request #641 from pipecat-ai/aleix/prepare-0.0.47
prepare 0.0.47
2024-10-22 10:30:42 -07:00
Aleix Conchillo Flaqué
7c432499db update CHANGELOG for 0.0.47 2024-10-22 10:02:50 -07:00
Aleix Conchillo Flaqué
8d75fcc9f0 use warnings package to report deprecated code 2024-10-22 10:02:21 -07:00
Aleix Conchillo Flaqué
61d73f81ae Merge pull request #639 from pipecat-ai/aleix/daily-transcription-model
transport(daily): use "nova-2-general" for transcription
2024-10-22 09:40:43 -07:00
Aleix Conchillo Flaqué
951255def9 transport(daily): use "nova-2-general" for transcription 2024-10-22 09:40:03 -07:00
Moof Soup
bf5a7c3562 docs: Clarify README example and token usage
clarified readme example
2024-10-21 19:54:34 -07:00
Mark Backman
e556f34094 Merge pull request #638 from pipecat-ai/mb/fix-silero-vad-import
Fix Silero VAD import issue
2024-10-21 20:48:06 -04:00
Mark Backman
ccc3691620 Fix Silero VAD import issue 2024-10-21 20:39:20 -04:00
Vanessa Pyne
5321affda7 Merge pull request #588 from Allenmylath/patch-11
Update README.md
2024-10-21 11:20:05 -05:00
Mark Backman
e5ad8dc67b Merge pull request #627 from pipecat-ai/mb/upgrade-gladia-to-v2-api
Update GladiaSTTService to use the Gladia V2 API
2024-10-21 12:01:20 -04:00
Mark Backman
46927805bc Update GladiaSTTService to use the Gladia V2 API 2024-10-21 07:10:38 -04:00
Kwindla Hultman Kramer
07712cdb16 gemini function calling and partial implementation of standard context stuff 2024-10-18 17:14:57 -07:00
allenmylath
b999b76f70 Update README.md
readme description still shows simple-chatbot definition hence made more accurate description
2024-10-15 08:14:43 +05:30
100 changed files with 2584 additions and 519 deletions

View File

@@ -38,4 +38,4 @@ jobs:
id: ruff
run: |
source .venv/bin/activate
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
ruff format --diff

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@ __pycache__/
*~
venv
.venv
/.idea
#*#
# Distribution / packaging

View File

@@ -9,14 +9,79 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
received OpenAI LLM context frame and it doesn't let it through until the
notifier is notified.
- Added `WakeNotifierFilter`. This processor expects a list of frame types and
will execute a given callback predicate when a frame of any of those type is
being processed. If the callback returns true the notifier will be notified.
- Added `NullFilter`. A null filter doesn't push any frames upstream or
downstream. This is usually used to disable one of the pipelines in
`ParallelPipeline`.
- Added `EventNotifier`. This can be used as a very simple synchronization
feature between processors.
- Added `TavusVideoService`. This is an integration for Tavus digital twins.
(see https://www.tavus.io/)
- Added `DailyTransport.update_subscriptions()`. This allows you to have fine
grained control of what media subscriptions you want for each participant in a
room.
### Changed
- The following `DailyTransport` functions are now `async` which means they need
to be awaited: `start_dialout`, `stop_dialout`, `start_recording`,
`stop_recording`, `capture_participant_transcription` and
`capture_participant_video`.
- Changed default output sample rate to 24000. This changes all TTS service to
output to 24000 and also the default output transport sample rate. This
improves audio quality at the cost of some extra bandwidth.
### Fixed
- Improved bot speaking detection for all TTS services by using actual bot
audio.
- Fixed an issue that was generating constant bot started/stopped speaking
frames for HTTP TTS services.
- Fixed an issue that was causing stuttering with AWS TTS service.
- Fixed an issue with PlayHTTTSService, where the TTFB metrics were reporting
very small time values.
### Other
- Added a new foundational example 22-natural-conversation.py. This examples
shows how to achieve a more natural conversation detecting when the user ends
statement.
## [0.0.47] - 2024-10-22
### Added
- Added `AssemblyAISTTService` and corresponding foundational examples
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
- Added a foundational example for Gladia transcription:
`13c-gladia-transcription.py`
### Changed
- Updated `GladiaSTTService` to use the V2 API.
- Changed `DailyTransport` transcription model to `nova-2-general`.
### Fixed
- Fixed an issue that would cause an import error when importing
`SileroVADAnalyzer` from the old package `pipecat.vad.silero`.
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
from `enable_metrics`.
@@ -32,6 +97,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Changed `DeepgramSTTService` model to `nova-2-general`.
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has

View File

@@ -64,7 +64,7 @@ async def main():
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
token="", # leave empty. Note: token is _not_ your api key
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
@@ -178,7 +178,7 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
:ensure t
:hook ((python-mode . lazy-ruff-mode))
:config
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-format-command "ruff format")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
@@ -197,14 +197,13 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
```json
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true
},
"ruff.format.args": ["--config", "line-length=100"]
}
```
## Getting help

165
docs/CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,165 @@
## Contributing to Pipecat
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
2. **Clone the repository**: Clone your forked repository to your local machine.
```bash
git clone https://github.com/your-username/pipecat
```
3. **Create a branch**: For your contribution, create a new branch.
```bash
git checkout -b your-branch-name
```
4. **Make your changes**: Edit or add files as necessary.
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
```bash
git commit -m "Description of your changes"
```
7. **Push your changes**: Push your branch to your forked repository.
```bash
git push origin your-branch-name
```
9. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
> Important: Describe the changes you've made clearly!
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the overall
community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or advances of
any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email address,
without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official email address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at pipecat-ai@daily.co.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series of
actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or permanent
ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within the
community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
[https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

22
docs/ISSUE_TEMPLATE.md Normal file
View File

@@ -0,0 +1,22 @@
# Description
Is this reporting a bug or feature request?
If reporting a bug, please fill out the following:
### Environment
- pipecat-ai version:
- python version:
- OS:
### Issue description
Provide a clear description of the issue.
### Repro steps
List the steps to reproduce the issue.
### Expected behavior
### Actual behavior
### Logs

View File

@@ -0,0 +1 @@
#### Please describe the changes in your PR. If it is addressing an issue, please reference that as well.

View File

@@ -46,5 +46,10 @@ PLAY_HT_API_KEY=...
# OpenAI
OPENAI_API_KEY=...
#OpenPipe
# OpenPipe
OPENPIPE_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
TAVUS_PERSONA_ID=...

View File

@@ -1,12 +1,41 @@
# Simple Chatbot
# Chatbot with canonical-metrics
<img src="image.png" width="420px">
This project implements a chatbot using a pipeline architecture that integrates audio processing, transcription, and a language model for conversational interactions. The chatbot operates within a daily communication environment, utilizing various services for text-to-speech and language model responses.
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
## Features
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
- **Audio Input and Output**: Captures microphone input and plays back audio responses.
- **Voice Activity Detection**: Utilizes Silero VAD to manage audio input intelligently.
- **Text-to-Speech**: Integrates ElevenLabs TTS service to convert text responses into audio.
- **Language Model Interaction**: Uses OpenAI's GPT-4 model to generate responses based on user input.
- **Transcription Services**: Captures and transcribes participant speech for analytics.
- **Metrics Collection**: Sends audio data for analysis via Canonical Metrics Service.
## Requirements
- Python 3.10+
- `python-dotenv`
- Additional libraries from the `pipecat` package.
## Setup
1. Clone the repository.
2. Install the required packages.
3. Set up environment variables for API keys:
- `OPENAI_API_KEY`
- `ELEVENLABS_API_KEY`
- `CANONICAL_API_KEY`
- `CANONICAL_API_URL`
4. Run the script.
## Usage
The chatbot introduces itself and engages in conversations, providing brief and creative responses. Designed for flexibility, it can support multiple languages with appropriate configuration.
## Events
- Participants joining or leaving the call are handled dynamically, adjusting the chatbot's behavior accordingly.
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.

View File

@@ -124,7 +124,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -123,7 +123,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -75,7 +75,7 @@ async def main(room_url: str, token: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -81,7 +81,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -84,7 +84,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -81,7 +81,7 @@ async def main():
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
params=LiveKitParams(audio_out_enabled=True),
)
tts = CartesiaTTSService(

View File

@@ -5,33 +5,31 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -105,11 +103,14 @@ async def main():
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -127,7 +127,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
runner = PipelineRunner()

View File

@@ -89,7 +89,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -87,7 +87,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -109,7 +109,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
lc.set_participant_id(participant["id"])
# Kick off the conversation.
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using

View File

@@ -85,7 +85,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -40,7 +40,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
@@ -89,7 +88,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -41,7 +41,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
@@ -90,7 +89,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -74,7 +74,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -86,7 +86,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -81,7 +81,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -5,12 +5,16 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -20,12 +24,6 @@ from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -85,11 +83,16 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)

View File

@@ -77,7 +77,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -96,7 +96,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -40,7 +40,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
@@ -85,7 +84,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -83,7 +83,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -63,6 +63,7 @@ async def main():
"Test",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -73,7 +74,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
await transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])

View File

@@ -65,7 +65,7 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True)
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
)
tk_transport = TkLocalTransport(
@@ -81,7 +81,7 @@ async def main():
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
await transport.capture_participant_video(participant["id"])
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])

View File

@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await tts.say("Hi! If you want to talk to me, just say 'Hey Robot'.")
runner = PipelineRunner()

View File

@@ -134,7 +134,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await tts.say("Hi, I'm listening!")
await transport.send_audio(sounds["ding1.wav"])

View File

@@ -84,8 +84,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -86,8 +86,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -83,8 +83,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -78,16 +78,13 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -127,7 +127,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -105,7 +105,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -160,8 +160,8 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(video_participant_id)
transport.capture_participant_video(video_participant_id, framerate=0)
await transport.capture_participant_transcription(video_participant_id)
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -123,7 +123,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
# await tts.say("Hi! Ask me about the weather in San Francisco.")

View File

@@ -153,8 +153,8 @@ indicate you should use the get_image tool are:
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(video_participant_id, framerate=0)
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")

View File

@@ -0,0 +1,173 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
tools = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "get_image",
"description": "Get and image from the camera or video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to to use when running inference on the acquired image.",
},
},
"required": ["question"],
},
},
]
}
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Say hello."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -141,7 +141,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{

View File

@@ -10,7 +10,7 @@ import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import LLMMessagesFrame, TTSUpdateSettingsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import Model, WhisperSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
@@ -61,16 +60,14 @@ async def main():
token,
"Pipecat",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = WhisperSTTService(model=Model.LARGE)
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
@@ -116,7 +113,6 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
@@ -132,7 +128,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{

View File

@@ -92,7 +92,7 @@ async def main():
# bot can "hear" and respond to them.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")

View File

@@ -99,7 +99,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -166,7 +166,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -223,7 +223,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -249,7 +249,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -219,7 +219,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -0,0 +1,290 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(
f"writing conversation to {filename}\n{json.dumps(context.get_messages_for_logging(), indent=4)}"
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = context.get_messages_for_persistent_storage()
# remove the last message (the instruction to save the context)
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
logger.debug(f"error saving conversation: {e}")
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
await result_callback(
{
"success": True,
"message": "The most recent conversation has been loaded. Awaiting further instructions.",
}
)
except Exception as e:
await result_callback({"success": False, "error": str(e)})
# Test message munging ...
messages = [
{
"role": "system",
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your
capabilities in a succinct way. Your output will be converted to audio so don't include special
characters in your answers. Respond to what the user said in a creative and helpful way.
You have several tools you can use to help you.
You can respond to questions about the weather using the get_weather tool.
You can save the current conversation using the save_conversation tool. This tool allows you to save
the current conversation to external storage. If the user asks you to save the conversation, use this
save_conversation too.
You can load a saved conversation using the load_conversation tool. This tool allows you to load a
conversation from external storage. You can get a list of conversations that have been saved using the
get_saved_conversation_filenames tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
""",
},
# {"role": "user", "content": ""},
# {"role": "assistant", "content": []},
# {"role": "user", "content": "Tell me"},
# {"role": "user", "content": "a joke"},
]
tools = [
{
"function_declarations": [
{
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "save_conversation",
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {
"user_request_text": {
"type": "string",
"description": "The text of the user's request to save the conversation.",
}
},
"required": ["user_request_text"],
},
},
{
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": None,
},
{
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
{
"name": "get_image",
"description": "Get and image from the camera or video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to to use when running inference on the acquired image.",
},
},
"required": ["question"],
},
},
]
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
llm.register_function("get_image", get_image)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,133 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from typing import Any, Mapping
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.tavus import TavusVideoService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.audio.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
tavus = TavusVideoService(
api_key=os.getenv("TAVUS_API_KEY"),
replica_id=os.getenv("TAVUS_REPLICA_ID"),
persona_id=os.getenv("TAVUS_PERSONA_ID", "pipecat0"),
session=session,
)
# get persona, look up persona_name, set this as the bot name to ignore
persona_name = await tavus.get_persona_name()
room_url = await tavus.initialize()
transport = DailyTransport(
room_url=room_url,
token=None,
bot_name="Pipecat bot",
params=DailyParams(
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab",
)
llm = OpenAILLMService(model="gpt-4o-mini")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
tavus, # Tavus output layer
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(
transport: DailyTransport, participant: Mapping[str, Any]
) -> None:
# Ignore the Tavus replica's microphone
if participant.get("info", {}).get("userName", "") == persona_name:
logger.debug(f"Ignoring {participant['id']}'s microphone")
await transport.update_subscriptions(
participant_settings={
participant["id"]: {
"media": {"microphone": "unsubscribed"},
}
}
)
if participant.get("info", {}).get("userName", "") != persona_name:
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,168 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.null_filter import NullFilter
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but it was easier as an example because we
# leverage the context aggregators.
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
statement_messages = [
{
"role": "system",
"content": "Determine if the user's statement is a complete sentence or question, ending in a natural pause or punctuation. Return 'YES' if it is complete and 'NO' if it seems to leave a thought unfinished.",
},
]
statement_context = OpenAILLMContext(statement_messages)
statement_context_aggregator = statement_llm.create_context_aggregator(statement_context)
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This a filter that will wake up the notifier if the given predicate
# (wake_check_filter) returns true.
completness_check = WakeNotifierFilter(
notifier, types=(TextFrame,), filter=wake_check_filter
)
# This processor keeps the last context and will let it through once the
# notifier is woken up.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user has completed a
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
# The ParallePipeline input are the user transcripts. We have two
# contexts. The first one will be used to determine if the user finished
# a statement and if so the notifier will be woken up. The second
# context is simply the regular context but it's gated waiting for the
# notifier to be woken up.
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
ParallelPipeline(
[
statement_context_aggregator.user(),
statement_llm,
completness_check,
NullFilter(),
],
[context_aggregator.user(), gated_context_aggregator, llm],
),
user_idle,
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,298 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import base64
import io
import os
import sys
from collections import deque
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotInterruptionFrame,
Frame,
ImageRawFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
TextFrame,
TranscriptionFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import (
RTVIBotTranscriptionProcessor,
RTVIUserTranscriptionProcessor,
)
from pipecat.services.anthropic import AnthropicLLMContext, AnthropicLLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
MAX_FRAMES = 5
FRAMES_PER_SECOND = 0.2
video_participant_id = None
anthropic_context = None
recent_image_frames = deque(maxlen=MAX_FRAMES)
most_recent_image_summary = ""
class ImageFrameCatcher(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
global recent_image_frames
await super().process_frame(frame, direction)
if isinstance(frame, ImageRawFrame):
recent_image_frames.append(frame)
else:
await self.push_frame(frame, direction)
class TranscriptFrameCatcher(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.debug(
f"TranscriptLogger: {frame}, num frames: {len(recent_image_frames)}, anthropic context: {anthropic_context}"
)
if anthropic_context:
add_message_with_images(
anthropic_context, frame.text, frames=list(recent_image_frames)
)
await self.push_frame(frame, direction)
class MessageFrameCatcher(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OpenAILLMContextFrame):
last_message = frame.context.messages[-1]
system_message = """
Give me a concise summary of the images supplied.
"""
frame = LLMMessagesFrame(
messages=[
{
"role": "system",
"content": system_message,
},
last_message,
],
)
await self.push_frame(frame, direction)
return
class MessageFrameCatcher2(FrameProcessor):
def __init__(self):
super().__init__()
self.text_blob = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
global most_recent_image_summary
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self.text_blob += f" {frame.text}"
if isinstance(frame, LLMFullResponseEndFrame):
logger.debug(f"MessageFrameCatcher2: {self.text_blob}")
most_recent_image_summary = self.text_blob
self.text_blob = ""
await self.push_frame(frame, direction)
async def main():
global llm
global anthropic_context
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True,
)
vision_llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True,
)
# todo: test with very short initial user message
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions. Keep
your answers brief unless explicitly asked for more information.
Your response will be turned into speech so use only simple words and punctuation.
"""
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt,
}
],
},
{"role": "user", "content": "Start the conversation by saying 'hello'."},
]
context = OpenAILLMContext(messages)
anthropic_context = AnthropicLLMContext.upgrade_to_anthropic(context)
context_aggregator = llm.create_context_aggregator(context)
rtvi_user_transcription = RTVIUserTranscriptionProcessor()
rtvi_bot_transcription = RTVIBotTranscriptionProcessor()
pipeline = Pipeline(
[
transport.input(), # Transport user input
ImageFrameCatcher(),
TranscriptFrameCatcher(),
rtvi_user_transcription,
context_aggregator.user(), # User speech to text
ParallelPipeline(
[
llm, # LLM
rtvi_bot_transcription,
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
],
[MessageFrameCatcher(), vision_llm, MessageFrameCatcher2()],
),
],
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
await transport.capture_participant_transcription(video_participant_id)
await transport.capture_participant_video(
video_participant_id, framerate=FRAMES_PER_SECOND, video_source="screenVideo"
)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message} - {context}")
if not recent_image_frames:
logger.debug("No image frames to send")
return
add_message_with_images(
anthropic_context, message["message"], frames=list(recent_image_frames)
)
interrupt_message = "STOP"
if interrupt_message == message["message"]:
logger.debug("Interrupting")
await task.queue_frames([BotInterruptionFrame()])
else:
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
def add_message_with_images(c, message, frames=None):
if frames is None:
frames = list(recent_image_frames)
if not frames:
logger.debug("No image frames to send")
return
# Create content list starting with all images
content = []
for frame in frames:
buffer = io.BytesIO()
Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": encoded_image,
},
}
)
# Add text message at the end if provided
if message:
content.append({"type": "text", "text": message})
# Go through all messages and replace user messages containing images
if c.messages:
for i, msg in enumerate(c.messages):
if (
msg["role"] == "user"
and isinstance(msg["content"], list)
and len(msg["content"]) > 0
):
if msg["content"][0].get("type") == "image":
logger.debug(
f"Replacing user message {i} containing images with summary: {most_recent_image_summary}"
)
c.messages[i] = {"role": "user", "content": most_recent_image_summary}
c.add_message({"role": "user", "content": content})
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -203,8 +203,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
ir.set_participant_id(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -352,7 +352,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
print(f"Context is: {context}")
await task.queue_frames([OpenAILLMContextFrame(context)])

View File

@@ -17,6 +17,10 @@ from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from dotenv import load_dotenv
load_dotenv(override=True)
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control

View File

@@ -102,7 +102,7 @@ async def main(room_url, token=None):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
images["book1"],

View File

@@ -165,7 +165,7 @@ Your task is to help the user understand and learn from this article in 2 senten
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
messages.append(
{
"role": "system",

View File

@@ -121,7 +121,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
runner = PipelineRunner()

View File

@@ -21,14 +21,14 @@ classifiers = [
]
dependencies = [
"aiohttp~=3.10.3",
"loguru~=0.7.2",
"Markdown~=3.7",
"numpy~=1.26.4",
"loguru~=0.7.2",
"Pillow~=10.4.0",
"protobuf~=4.25.4",
"pydantic~=2.8.2",
"pyloudnorm~=0.1.1",
"scipy~=1.14.1",
"resampy~=0.4.3",
]
[project.urls]
@@ -42,13 +42,13 @@ aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=13.1" ]
daily = [ "daily-python~=0.11.0" ]
daily = [ "daily-python~=0.12.0" ]
deepgram = [ "deepgram-sdk~=3.7.3" ]
elevenlabs = [ "websockets~=13.1" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
@@ -74,3 +74,7 @@ pythonpath = ["src"]
[tool.setuptools_scm]
local_scheme = "no-local-version"
fallback_version = "0.0.0-dev"
[tool.ruff]
exclude = ["*_pb2.py"]
line-length = 100

View File

@@ -7,13 +7,14 @@
import audioop
import numpy as np
import pyloudnorm as pyln
from scipy import signal
import resampy
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
if original_rate == target_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
num_samples = int(len(audio) * target_rate / original_rate)
resampled_audio = signal.resample(audio_data, num_samples)
resampled_audio = resampy.resample(audio_data, original_rate, target_rate)
return resampled_audio.astype(np.int16).tobytes()

View File

@@ -52,7 +52,7 @@ class SileroOnnxModel:
if sr not in self.sample_rates:
raise ValueError(
f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)"
f"Supported sampling rates: {self.sample_rates} (or multiple of 16000)"
)
if sr / np.shape(x)[1] > 31.25:
raise ValueError("Input audio chunk is too short")

View File

@@ -12,6 +12,11 @@ from pydantic.main import BaseModel
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
VAD_CONFIDENCE = 0.7
VAD_START_SECS = 0.2
VAD_STOP_SECS = 0.8
VAD_MIN_VOLUME = 0.6
class VADState(Enum):
QUIET = 1
@@ -21,10 +26,10 @@ class VADState(Enum):
class VADParams(BaseModel):
confidence: float = 0.7
start_secs: float = 0.2
stop_secs: float = 0.8
min_volume: float = 0.6
confidence: float = VAD_CONFIDENCE
start_secs: float = VAD_START_SECS
stop_secs: float = VAD_STOP_SECS
min_volume: float = VAD_MIN_VOLUME
class VADAnalyzer:
@@ -41,13 +46,17 @@ class VADAnalyzer:
self._prev_volume = 0
@property
def sample_rate(self):
def sample_rate(self) -> int:
return self._sample_rate
@property
def num_channels(self):
def num_channels(self) -> int:
return self._num_channels
@property
def params(self) -> VADParams:
return self._params
@abstractmethod
def num_frames_required(self) -> int:
pass

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class GatedOpenAILLMContextAggregator(FrameProcessor):
"""This aggregator keeps the last received OpenAI LLM context frame and it
doesn't let it through until the notifier is notified.
"""
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.push_frame(frame)
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
await self.push_frame(frame)
elif isinstance(frame, OpenAILLMContextFrame):
self._last_context_frame = frame
else:
await self.push_frame(frame, direction)
async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
if self._last_context_frame:
await self.push_frame(self._last_context_frame)
self._last_context_frame = None
except asyncio.CancelledError:
break

View File

@@ -70,6 +70,8 @@ class OpenAILLMContext:
context.add_message(message)
return context
# todo: deprecate from_image_frame. It's only used to create a single-use
# context, which isn't useful for most real-world applications.
@staticmethod
def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext":
"""
@@ -77,6 +79,10 @@ class OpenAILLMContext:
expects images to be base64 encoded, but other vision models may not.
So we'll store the image as bytes and do the base64 encoding as needed
in the LLM service.
NOTE: the above only applies to the deprecated use of this method. The
add_image_frame_message() below does the base64 encoding as expected
in the OpenAI format.
"""
context = OpenAILLMContext()
buffer = io.BytesIO()

View File

@@ -4,14 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List
from typing import Tuple, Type
from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameFilter(FrameProcessor):
def __init__(self, types: List[type]):
def __init__(self, types: Tuple[Type[Frame]]):
super().__init__()
self._types = types
@@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor):
#
def _should_passthrough_frame(self, frame):
for t in self._types:
if isinstance(frame, t):
return True
if isinstance(frame, self._types):
return True
return (
isinstance(frame, AppFrame)

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.processors.frame_processor import FrameProcessor
class NullFilter(FrameProcessor):
"""This filter doesn't allow passing any frames up or downstream."""
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -0,0 +1,40 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Awaitable, Callable, Tuple, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class WakeNotifierFilter(FrameProcessor):
"""This processor expects a list of frame types and will execute a given
callback predicate when a frame of any of those type is being processed. If
the callback returns true the notifier will be notified.
"""
def __init__(
self,
notifier: BaseNotifier,
*,
types: Tuple[Type[Frame]],
filter: Callable[[Frame], Awaitable[bool]],
**kwargs,
):
super().__init__(**kwargs)
self._notifier = notifier
self._types = types
self._filter = filter
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, self._types) and await self._filter(frame):
await self._notifier.notify()
await self.push_frame(frame, direction)

View File

@@ -205,7 +205,7 @@ class TTSService(AIService):
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
# TTS output sample rate
sample_rate: int = 16000,
sample_rate: int = 24000,
text_filter: Optional[BaseTextFilter] = None,
**kwargs,
):
@@ -514,7 +514,7 @@ class SegmentedSTTService(STTService):
min_volume: float = 0.6,
max_silence_secs: float = 0.3,
max_buffer_secs: float = 1.5,
sample_rate: int = 16000,
sample_rate: int = 24000,
num_channels: int = 1,
**kwargs,
):

View File

@@ -53,8 +53,6 @@ class AssemblyAISTTService(STTService):
async def set_language(self, language: Language):
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)

View File

@@ -4,11 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -45,7 +48,7 @@ class AWSTTSService(TTSService):
aws_access_key_id: str,
region: str,
voice_id: str = "Joanna",
sample_rate: int = 16000,
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):
@@ -164,6 +167,14 @@ class AWSTTSService(TTSService):
return ssml
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
def read_audio_data(**args):
response = self._polly_client.synthesize_speech(**args)
if "AudioStream" in response:
audio_data = response["AudioStream"].read()
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
return resampled
return None
logger.debug(f"Generating TTS: [{text}]")
try:
@@ -178,28 +189,31 @@ class AWSTTSService(TTSService):
"OutputFormat": "pcm",
"VoiceId": self._voice_id,
"Engine": self._settings["engine"],
"SampleRate": str(self._settings["sample_rate"]),
# AWS only supports 8000 and 16000 for PCM. We select 16000.
"SampleRate": "16000",
}
# Filter out None values
filtered_params = {k: v for k, v in params.items() if v is not None}
response = self._polly_client.synthesize_speech(**filtered_params)
audio_data = await asyncio.to_thread(read_audio_data, **filtered_params)
if not audio_data:
logger.error(f"{self} No audio data returned")
yield None
return
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
if "AudioStream" in response:
with response["AudioStream"] as stream:
audio_data = stream.read()
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield frame
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -25,8 +25,14 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
URLImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import ImageGenService, STTService, TTSService
from pipecat.services.openai import BaseOpenAILLMService
from pipecat.services.openai import (
BaseOpenAILLMService,
OpenAIAssistantContextAggregator,
OpenAIContextAggregatorPair,
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
@@ -38,6 +44,7 @@ try:
SpeechConfig,
SpeechRecognizer,
SpeechSynthesizer,
SpeechSynthesisOutputFormat,
)
from azure.cognitiveservices.speech.audio import (
AudioStreamFormat,
@@ -70,6 +77,33 @@ class AzureLLMService(BaseOpenAILLMService):
api_version=self._api_version,
)
@staticmethod
def create_context_aggregator(
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> OpenAIContextAggregatorPair:
user = OpenAIUserContextAggregator(context)
assistant = OpenAIAssistantContextAggregator(
user, expect_stripped_words=assistant_expect_stripped_words
)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
match sample_rate:
case 8000:
return SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm
case 16000:
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
case 22050:
return SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm
case 24000:
return SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm
case 44100:
return SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm
case 48000:
return SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
class AzureTTSService(TTSService):
class InputParams(BaseModel):
@@ -88,13 +122,15 @@ class AzureTTSService(TTSService):
api_key: str,
region: str,
voice="en-US-SaraNeural",
sample_rate: int = 16000,
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
speech_config = SpeechConfig(subscription=api_key, region=region)
speech_config.set_speech_synthesis_output_format(sample_rate_to_output_format(sample_rate))
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._settings = {
@@ -283,7 +319,7 @@ class AzureSTTService(STTService):
api_key: str,
region: str,
language=Language.EN_US,
sample_rate=16000,
sample_rate=24000,
channels=1,
**kwargs,
):

View File

@@ -68,9 +68,6 @@ def language_to_cartesia_language(language: Language) -> str | None:
class CartesiaTTSService(WordTTSService):
class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000
container: Optional[str] = "raw"
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
emotion: Optional[List[str]] = []
@@ -83,6 +80,9 @@ class CartesiaTTSService(WordTTSService):
cartesia_version: str = "2024-06-10",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic-english",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
**kwargs,
):
@@ -99,7 +99,7 @@ class CartesiaTTSService(WordTTSService):
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
sample_rate=params.sample_rate,
sample_rate=sample_rate,
**kwargs,
)
@@ -108,9 +108,9 @@ class CartesiaTTSService(WordTTSService):
self._url = url
self._settings = {
"output_format": {
"container": params.container,
"encoding": params.encoding,
"sample_rate": params.sample_rate,
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -288,9 +288,6 @@ class CartesiaTTSService(WordTTSService):
class CartesiaHttpTTSService(TTSService):
class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000
container: Optional[str] = "raw"
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
emotion: Optional[List[str]] = []
@@ -302,17 +299,20 @@ class CartesiaHttpTTSService(TTSService):
voice_id: str,
model: str = "sonic-english",
base_url: str = "https://api.cartesia.ai",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._settings = {
"output_format": {
"container": params.container,
"encoding": params.encoding,
"sample_rate": params.sample_rate,
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
},
"language": self.language_to_service_language(params.language)
if params.language

View File

@@ -51,11 +51,11 @@ class DeepgramTTSService(TTSService):
*,
api_key: str,
voice: str = "aura-helios-en",
sample_rate: int = 16000,
sample_rate: int = 24000,
encoding: str = "linear16",
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,

View File

@@ -36,6 +36,8 @@ except ModuleNotFoundError as e:
)
raise Exception(f"Missing module: {e}")
ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]
def sample_rate_from_output_format(output_format: str) -> int:
match output_format:
@@ -74,7 +76,6 @@ def calculate_word_times(
class ElevenLabsTTSService(WordTTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
optimize_streaming_latency: Optional[str] = None
stability: Optional[float] = None
similarity_boost: Optional[float] = None
@@ -98,6 +99,7 @@ class ElevenLabsTTSService(WordTTSService):
voice_id: str,
model: str = "eleven_turbo_v2_5",
url: str = "wss://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
params: InputParams = InputParams(),
**kwargs,
):
@@ -120,18 +122,18 @@ class ElevenLabsTTSService(WordTTSService):
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
sample_rate=sample_rate_from_output_format(params.output_format),
sample_rate=sample_rate_from_output_format(output_format),
**kwargs,
)
self._api_key = api_key
self._url = url
self._settings = {
"sample_rate": sample_rate_from_output_format(params.output_format),
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
"output_format": params.output_format,
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,

View File

@@ -22,7 +22,8 @@ class FireworksLLMService(BaseOpenAILLMService):
def __init__(
self,
*,
api_key: str,
model: str = "accounts/fireworks/models/firefunction-v1",
base_url: str = "https://api.fireworks.ai/inference/v1",
):
super().__init__(model=model, base_url=base_url)
super().__init__(api_key=api_key, model=model, base_url=base_url)

View File

@@ -8,6 +8,7 @@ import base64
import json
from typing import AsyncGenerator, Optional
import aiohttp
from loguru import logger
from pydantic.main import BaseModel
@@ -23,7 +24,6 @@ from pipecat.services.ai_services import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
# See .env.example for Gladia configuration needed
try:
import websockets
except ModuleNotFoundError as e:
@@ -38,15 +38,16 @@ class GladiaSTTService(STTService):
class InputParams(BaseModel):
sample_rate: Optional[int] = 16000
language: Optional[Language] = Language.EN
transcription_hint: Optional[str] = None
endpointing: Optional[int] = 200
prosody: Optional[bool] = None
endpointing: Optional[float] = 0.2
maximum_duration_without_endpointing: Optional[int] = 10
audio_enhancer: Optional[bool] = None
words_accurate_timestamps: Optional[bool] = None
def __init__(
self,
*,
api_key: str,
url: str = "wss://api.gladia.io/audio/text/audio-transcription",
url: str = "https://api.gladia.io/v2/live",
confidence: float = 0.5,
params: InputParams = InputParams(),
**kwargs,
@@ -56,101 +57,82 @@ class GladiaSTTService(STTService):
self._api_key = api_key
self._url = url
self._settings = {
"encoding": "wav/pcm",
"bit_depth": 16,
"sample_rate": params.sample_rate,
"language": self.language_to_service_language(params.language)
if params.language
else Language.EN,
"transcription_hint": params.transcription_hint,
"channels": 1,
"language_config": {
"languages": [self.language_to_service_language(params.language)]
if params.language
else [],
"code_switching": False,
},
"endpointing": params.endpointing,
"prosody": params.prosody,
"maximum_duration_without_endpointing": params.maximum_duration_without_endpointing,
"pre_processing": {
"audio_enhancer": params.audio_enhancer,
},
"realtime_processing": {
"words_accurate_timestamps": params.words_accurate_timestamps,
},
}
self._confidence = confidence
def language_to_service_language(self, language: Language) -> str | None:
match language:
case Language.BG:
return "bulgarian"
case Language.CA:
return "catalan"
case Language.ZH:
return "chinese"
case Language.CS:
return "czech"
case Language.DA:
return "danish"
case Language.NL:
return "dutch"
case (
Language.EN
| Language.EN_US
| Language.EN_AU
| Language.EN_GB
| Language.EN_NZ
| Language.EN_IN
):
return "english"
case Language.ET:
return "estonian"
case Language.FI:
return "finnish"
case Language.FR | Language.FR_CA:
return "french"
case Language.DE | Language.DE_CH:
return "german"
case Language.EL:
return "greek"
case Language.HI:
return "hindi"
case Language.HU:
return "hungarian"
case Language.ID:
return "indonesian"
case Language.IT:
return "italian"
case Language.JA:
return "japanese"
case Language.KO:
return "korean"
case Language.LV:
return "latvian"
case Language.LT:
return "lithuanian"
case Language.MS:
return "malay"
case Language.NO:
return "norwegian"
case Language.PL:
return "polish"
case Language.PT | Language.PT_BR:
return "portuguese"
case Language.RO:
return "romanian"
case Language.RU:
return "russian"
case Language.SK:
return "slovak"
case Language.ES:
return "spanish"
case Language.SV:
return "slovenian"
case Language.TH:
return "thai"
case Language.TR:
return "turkish"
case Language.UK:
return "ukrainian"
case Language.VI:
return "vietnamese"
return None
language_map = {
Language.BG: "bg",
Language.CA: "ca",
Language.ZH: "zh",
Language.CS: "cs",
Language.DA: "da",
Language.NL: "nl",
Language.EN: "en",
Language.EN_US: "en",
Language.EN_AU: "en",
Language.EN_GB: "en",
Language.EN_NZ: "en",
Language.EN_IN: "en",
Language.ET: "et",
Language.FI: "fi",
Language.FR: "fr",
Language.FR_CA: "fr",
Language.DE: "de",
Language.DE_CH: "de",
Language.EL: "el",
Language.HI: "hi",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.LV: "lv",
Language.LT: "lt",
Language.MS: "ms",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.PT_BR: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.ES: "es",
Language.SV: "sv",
Language.TH: "th",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
}
return language_map.get(language)
async def start(self, frame: StartFrame):
await super().start(frame)
self._websocket = await websockets.connect(self._url)
response = await self._setup_gladia()
self._websocket = await websockets.connect(response["url"])
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
await self._setup_gladia()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._send_stop_recording()
await self._websocket.close()
async def cancel(self, frame: CancelFrame):
@@ -164,39 +146,37 @@ class GladiaSTTService(STTService):
yield None
async def _setup_gladia(self):
configuration = {
"x_gladia_key": self._api_key,
"encoding": "WAV/PCM",
"model_type": "fast",
"language_behaviour": "manual",
"sample_rate": self._settings["sample_rate"],
"language": self._settings["language"],
"transcription_hint": self._settings["transcription_hint"],
"endpointing": self._settings["endpointing"],
"prosody": self._settings["prosody"],
}
await self._websocket.send(json.dumps(configuration))
async with aiohttp.ClientSession() as session:
async with session.post(
self._url,
headers={"X-Gladia-Key": self._api_key, "Content-Type": "application/json"},
json=self._settings,
) as response:
if response.ok:
return await response.json()
else:
logger.error(
f"Gladia error: {response.status}: {response.text or response.reason}"
)
raise Exception(f"Failed to initialize Gladia session: {response.status}")
async def _send_audio(self, audio: bytes):
message = {"frames": base64.b64encode(audio).decode("utf-8")}
data = base64.b64encode(audio).decode("utf-8")
message = {"type": "audio_chunk", "data": {"chunk": data}}
await self._websocket.send(json.dumps(message))
async def _send_stop_recording(self):
await self._websocket.send(json.dumps({"type": "stop_recording"}))
async def _receive_task_handler(self):
async for message in self._websocket:
utterance = json.loads(message)
if not utterance:
continue
if "error" in utterance:
message = utterance["message"]
logger.error(f"Gladia error: {message}")
elif "confidence" in utterance:
type = utterance["type"]
confidence = utterance["confidence"]
transcript = utterance["transcription"]
content = json.loads(message)
if content["type"] == "transcript":
utterance = content["data"]["utterance"]
confidence = utterance.get("confidence", 0)
transcript = utterance["text"]
if confidence >= self._confidence:
if type == "final":
if content["data"]["is_final"]:
await self.push_frame(
TranscriptionFrame(transcript, "", time_now_iso8601())
)

View File

@@ -5,11 +5,15 @@
#
import asyncio
import base64
import io
import json
from typing import AsyncGenerator, List, Literal, Optional
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
from loguru import logger
from pydantic import BaseModel
from PIL import Image
from pydantic import BaseModel, Field
from pipecat.frames.frames import (
ErrorFrame,
@@ -24,18 +28,24 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
VisionImageRawFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService, TTSService
from pipecat.services.openai import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
try:
import google.ai.generativelanguage as glm
import google.generativeai as gai
from google.cloud import texttospeech_v1
from google.generativeai.types import GenerationConfig
from google.oauth2 import service_account
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -45,6 +55,249 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class GoogleUserContextAggregator(OpenAIUserContextAggregator):
async def _push_aggregation(self):
if len(self._aggregation) > 0:
self._context.add_message(
glm.Content(role="user", parts=[glm.Part(text=self._aggregation)])
)
# Reset the aggregation. Reset it before pushing it down, otherwise
# if the tasks gets cancelled we won't be able to clear things up.
self._aggregation = ""
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)
# Reset our accumulator state.
self._reset()
class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
async def _push_aggregation(self):
if not (
self._aggregation or self._function_call_result or self._pending_image_frame_message
):
return
run_llm = False
aggregation = self._aggregation
self._reset()
try:
if self._function_call_result:
frame = self._function_call_result
self._function_call_result = None
if frame.result:
logger.debug(f"FunctionCallResultFrame result: {frame.arguments}")
self._context.add_message(
glm.Content(
role="model",
parts=[
glm.Part(
function_call=glm.FunctionCall(
name=frame.function_name, args=frame.arguments
)
)
],
)
)
response = frame.result
if isinstance(response, str):
response = {"response": response}
self._context.add_message(
glm.Content(
role="user",
parts=[
glm.Part(
function_response=glm.FunctionResponse(
name=frame.function_name, response=response
)
)
],
)
)
run_llm = not bool(self._function_calls_in_progress)
else:
self._context.add_message(
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
)
if self._pending_image_frame_message:
frame = self._pending_image_frame_message
self._pending_image_frame_message = None
self._context.add_image_frame_message(
format=frame.user_image_raw_frame.format,
size=frame.user_image_raw_frame.size,
image=frame.user_image_raw_frame.image,
text=frame.text,
)
run_llm = True
if run_llm:
await self._user_context_aggregator.push_context_frame()
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)
except Exception as e:
logger.exception(f"Error processing frame: {e}")
@dataclass
class GoogleContextAggregatorPair:
_user: "GoogleUserContextAggregator"
_assistant: "GoogleAssistantContextAggregator"
def user(self) -> "GoogleUserContextAggregator":
return self._user
def assistant(self) -> "GoogleAssistantContextAggregator":
return self._assistant
class GoogleLLMContext(OpenAILLMContext):
@staticmethod
def upgrade_to_google(obj: OpenAILLMContext) -> "GoogleLLMContext":
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GoogleLLMContext):
logger.debug(f"Upgrading to Google: {obj}")
obj.__class__ = GoogleLLMContext
obj._restructure_from_openai_messages()
return obj
def set_messages(self, messages: List):
self._messages[:] = messages
self._restructure_from_openai_messages()
def get_messages_for_logging(self):
msgs = []
for message in self.messages:
obj = glm.Content.to_dict(message)
try:
if "parts" in obj:
for part in obj["parts"]:
if "inline_data" in part:
part["inline_data"]["data"] = "..."
except Exception as e:
logger.debug(f"Error: {e}")
msgs.append(obj)
return msgs
def from_standard_message(self, message):
role = message["role"]
content = message.get("content", [])
if role == "system":
role = "user"
elif role == "assistant":
role = "model"
parts = []
if message.get("tool_calls"):
for tc in message["tool_calls"]:
parts.append(
glm.Part(
function_call=glm.FunctionCall(
name=tc["function"]["name"],
args=json.loads(tc["function"]["arguments"]),
)
)
)
elif role == "tool":
role = "model"
parts.append(
glm.Part(
function_response=glm.FunctionResponse(
name="tool_call_result", # seems to work to hard-code the same name every time
response=json.loads(message["content"]),
)
)
)
elif isinstance(content, str):
parts.append(glm.Part(text=content))
elif isinstance(content, list):
for c in content:
if c["type"] == "text":
parts.append(glm.Part(text=c["text"]))
elif c["type"] == "image_url":
parts.append(
glm.Part(
inline_data=glm.Blob(
mime_type="image/jpeg",
data=base64.b64decode(c["image_url"]["url"].split(",")[1]),
)
)
)
message = glm.Content(role=role, parts=parts)
return message
def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
):
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
parts = []
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(inline_data=glm.Blob(mime_type="image/jpeg", data=buffer.getvalue())),
)
self.add_message(glm.Content(role="user", parts=parts))
def to_standard_messages(self, obj) -> list:
msg = {"role": obj.role, "content": []}
if msg["role"] == "model":
msg["role"] = "assistant"
for part in obj.parts:
if part.text:
msg["content"].append({"type": "text", "text": part.text})
elif part.inline_data:
encoded = base64.b64encode(part.inline_data.data).decode("utf-8")
msg["content"].append(
{
"type": "image_url",
"image_url": {"url": f"data:{part.inline_data.mime_type};base64,{encoded}"},
}
)
elif part.function_call:
args = type(part.function_call).to_dict(part.function_call).get("args", {})
msg["tool_calls"] = [
{
"id": part.function_call.name,
"type": "function",
"function": {
"name": part.function_call.name,
"arguments": json.dumps(args),
},
}
]
elif part.function_response:
msg["role"] = "tool"
resp = (
type(part.function_response).to_dict(part.function_response).get("response", {})
)
msg["tool_call_id"] = part.function_response.name
msg["content"] = json.dumps(resp)
# there might be no content parts for tool_calls messages
if not msg["content"]:
del msg["content"]
return [msg]
def _restructure_from_openai_messages(self):
# first, map across self._messages calling self.from_standard_message(m) to modify messages in place
try:
self._messages[:] = [self.from_standard_message(m) for m in self._messages]
except Exception as e:
logger.error(f"Error mapping messages: {e}")
# iterate over messages and remove any messages that have an empty content list
self._messages = [m for m in self._messages if m.parts]
class GoogleLLMService(LLMService):
"""This class implements inference with Google's AI models
@@ -53,10 +306,31 @@ class GoogleLLMService(LLMService):
franca for all LLM services, so that it is easy to switch between different LLMs.
"""
def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
class InputParams(BaseModel):
max_tokens: Optional[int] = Field(default=4096, ge=1)
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
top_k: Optional[int] = Field(default=None, ge=0)
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
def __init__(
self,
*,
api_key: str,
model: str = "gemini-1.5-flash-latest",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
gai.configure(api_key=api_key)
self._create_client(model)
self._settings = {
"max_tokens": params.max_tokens,
"temperature": params.temperature,
"top_k": params.top_k,
"top_p": params.top_p,
"extra": params.extra if isinstance(params.extra, dict) else {},
}
def can_generate_metrics(self) -> bool:
return True
@@ -98,20 +372,58 @@ class GoogleLLMService(LLMService):
async def _process_context(self, context: OpenAILLMContext):
await self.push_frame(LLMFullResponseStartFrame())
try:
logger.debug(f"Generating chat: {context.get_messages_json()}")
logger.debug(f"Generating chat: {context.get_messages_for_logging()}")
messages = self._get_messages_from_openai_context(context)
# todo: move this into the new context code structure, convert from openai context one time
# todo: add system instructions
# messages = self._get_messages_from_openai_context(context)
messages = context.messages
# Filter out None values and create GenerationConfig
generation_params = {
k: v
for k, v in {
"temperature": self._settings["temperature"],
"top_p": self._settings["top_p"],
"top_k": self._settings["top_k"],
"max_output_tokens": self._settings["max_tokens"],
}.items()
if v is not None
}
generation_config = GenerationConfig(**generation_params) if generation_params else None
await self.start_ttfb_metrics()
response = self._client.generate_content(messages, stream=True)
tools = context.tools if context.tools else []
response = self._client.generate_content(
contents=messages, tools=tools, stream=True, generation_config=generation_config
)
tokens = LLMTokenUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
total_tokens=response.usage_metadata.total_token_count,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_ttfb_metrics()
async for chunk in self._async_generator_wrapper(response):
# todo: usage
try:
text = chunk.text
await self.push_frame(TextFrame(text))
for c in chunk.parts:
if c.text:
await self.push_frame(TextFrame(c.text))
elif c.function_call:
args = type(c.function_call).to_dict(c.function_call).get("args", {})
await self.call_function(
context=context,
tool_call_id="what_should_this_be",
function_name=c.function_call.name,
arguments=args,
)
except Exception as e:
# Google LLMs seem to flag safety issues a lot!
if chunk.candidates[0].finish_reason == 3:
@@ -132,10 +444,11 @@ class GoogleLLMService(LLMService):
context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
context: GoogleLLMContext = GoogleLLMContext.upgrade_to_google(frame.context)
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
context = GoogleLLMContext(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
# todo: fix this
context = OpenAILLMContext.from_image_frame(frame)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame.settings)
@@ -145,6 +458,16 @@ class GoogleLLMService(LLMService):
if context:
await self._process_context(context)
@staticmethod
def create_context_aggregator(
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> GoogleContextAggregatorPair:
user = GoogleUserContextAggregator(context)
assistant = GoogleAssistantContextAggregator(
user, expect_stripped_words=assistant_expect_stripped_words
)
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
class GoogleTTSService(TTSService):
class InputParams(BaseModel):

View File

@@ -99,7 +99,12 @@ class BaseOpenAILLMService(LLMService):
)
seed: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=0)
temperature: Optional[float] = Field(default_factory=lambda: NOT_GIVEN, ge=0.0, le=2.0)
# Note: top_k is currently not supported by the OpenAI client library,
# so top_k is ignore right now.
top_k: Optional[int] = Field(default=None, ge=0)
top_p: Optional[float] = Field(default_factory=lambda: NOT_GIVEN, ge=0.0, le=1.0)
max_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
max_completion_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
def __init__(
@@ -118,6 +123,8 @@ class BaseOpenAILLMService(LLMService):
"seed": params.seed,
"temperature": params.temperature,
"top_p": params.top_p,
"max_tokens": params.max_tokens,
"max_completion_tokens": params.max_completion_tokens,
"extra": params.extra if isinstance(params.extra, dict) else {},
}
self.set_model_name(model)
@@ -152,6 +159,8 @@ class BaseOpenAILLMService(LLMService):
"seed": self._settings["seed"],
"temperature": self._settings["temperature"],
"top_p": self._settings["top_p"],
"max_tokens": self._settings["max_tokens"],
"max_completion_tokens": self._settings["max_completion_tokens"],
}
params.update(self._settings["extra"])
@@ -214,6 +223,9 @@ class BaseOpenAILLMService(LLMService):
await self.stop_ttfb_metrics()
if not chunk.choices[0].delta:
continue
if chunk.choices[0].delta.tool_calls:
# We're streaming the LLM response to enable the fastest response times.
# For text, we just yield each chunk as we receive it and count on consumers
@@ -533,6 +545,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
self._context.add_message(
{
"role": "assistant",
"content": "", # content field required for Grok function calling
"tool_calls": [
{
"id": frame.tool_call_id,

View File

@@ -128,7 +128,9 @@ class OpenAIRealtimeBetaLLMService(LLMService):
#
async def _handle_interruption(self):
if self._session_properties.turn_detection is None:
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
if self._session_properties.turn_detection is False:
await self.send_client_event(events.InputAudioBufferClearEvent())
await self.send_client_event(events.ResponseCancelEvent())
await self._truncate_current_audio_response()
@@ -138,11 +140,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self.push_frame(TTSStoppedFrame())
async def _handle_user_started_speaking(self, frame):
if self._session_properties.turn_detection is None:
await self._handle_interruption()
pass
async def _handle_user_stopped_speaking(self, frame):
if self._session_properties.turn_detection is None:
# None and False are different. Check for False. None means we're using OpenAI's
# built-in turn detection defaults.
if self._session_properties.turn_detection is False:
await self.send_client_event(events.InputAudioBufferCommitEvent())
await self.send_client_event(events.ResponseCreateEvent())

View File

@@ -8,6 +8,7 @@ import asyncio
import io
import json
import struct
import uuid
from typing import AsyncGenerator, Optional
import aiohttp
@@ -115,7 +116,7 @@ class PlayHTTTSService(TTSService):
user_id: str,
voice_url: str,
voice_engine: str = "PlayHT3.0-mini",
sample_rate: int = 16000,
sample_rate: int = 24000,
output_format: str = "wav",
params: InputParams = InputParams(),
**kwargs,
@@ -127,6 +128,7 @@ class PlayHTTTSService(TTSService):
self._websocket_url = None
self._websocket = None
self._receive_task = None
self._request_id = None
self._settings = {
"sample_rate": sample_rate,
@@ -191,6 +193,7 @@ class PlayHTTTSService(TTSService):
await self._receive_task
self._receive_task = None
self._request_id = None
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
@@ -221,6 +224,7 @@ class PlayHTTTSService(TTSService):
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
self._request_id = None
async def _receive_task_handler(self):
try:
@@ -242,9 +246,10 @@ class PlayHTTTSService(TTSService):
logger.debug(f"Received text message: {message}")
try:
msg = json.loads(message)
if "request_id" in msg:
if "request_id" in msg and msg["request_id"] == self._request_id:
await self.push_frame(TTSStoppedFrame())
header_received = False # Reset for the next audio stream
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}'))
@@ -263,8 +268,10 @@ class PlayHTTTSService(TTSService):
if not self._websocket or self._websocket.closed:
await self._connect()
await self.start_ttfb_metrics()
yield TTSStartedFrame()
if not self._request_id:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
self._request_id = str(uuid.uuid4())
tts_command = {
"text": text,
@@ -275,6 +282,7 @@ class PlayHTTTSService(TTSService):
"language": self._settings["language"],
"speed": self._settings["speed"],
"seed": self._settings["seed"],
"request_id": self._request_id,
}
try:
@@ -293,8 +301,6 @@ class PlayHTTTSService(TTSService):
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
yield ErrorFrame(f"{self} error: {str(e)}")
finally:
await self.stop_all_metrics()
class PlayHTHttpTTSService(TTSService):
@@ -310,7 +316,7 @@ class PlayHTHttpTTSService(TTSService):
user_id: str,
voice_url: str,
voice_engine: str = "PlayHT3.0-mini",
sample_rate: int = 16000,
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):

View File

@@ -0,0 +1,137 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This module implements Tavus as a sink transport layer"""
import aiohttp
import base64
from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
TransportMessageUrgentFrame,
TTSStartedFrame,
TTSStoppedFrame,
StartInterruptionFrame,
EndFrame,
CancelFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService
from pipecat.audio.utils import resample_audio
from loguru import logger
class TavusVideoService(AIService):
"""Class to send base64 encoded audio to Tavus"""
def __init__(
self,
*,
api_key: str,
replica_id: str,
persona_id: str = "pipecat0",
session: aiohttp.ClientSession,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._api_key = api_key
self._replica_id = replica_id
self._persona_id = persona_id
self._session = session
self._conversation_id: str
async def initialize(self) -> str:
url = "https://tavusapi.com/v2/conversations"
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
payload = {
"replica_id": self._replica_id,
"persona_id": self._persona_id,
}
async with self._session.post(url, headers=headers, json=payload) as r:
r.raise_for_status()
response_json = await r.json()
logger.debug(f"TavusVideoService joined {response_json['conversation_url']}")
self._conversation_id = response_json["conversation_id"]
return response_json["conversation_url"]
def can_generate_metrics(self) -> bool:
return True
async def get_persona_name(self) -> str:
url = f"https://tavusapi.com/v2/personas/{self._persona_id}"
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
async with self._session.get(url, headers=headers) as r:
r.raise_for_status()
response_json = await r.json()
logger.debug(f"TavusVideoService persona grabbed {response_json}")
return response_json["persona_name"]
async def _end_conversation(self) -> None:
url = f"https://tavusapi.com/v2/conversations/{self._conversation_id}/end"
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
async with self._session.post(url, headers=headers) as r:
r.raise_for_status()
async def _encode_audio_and_send(
self, audio: bytes, original_sample_rate: int, done: bool
) -> None:
"""Encodes audio to base64 and sends it to Tavus"""
if not done:
audio = resample_audio(audio, original_sample_rate, 16000)
audio_base64 = base64.b64encode(audio).decode("utf-8")
logger.trace(f"TavusVideoService sending {len(audio)} bytes")
await self._send_audio_message(audio_base64, done=done)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSStartedFrame):
await self.start_processing_metrics()
await self.start_ttfb_metrics()
self._current_idx_str = str(frame.id)
elif isinstance(frame, TTSAudioRawFrame):
await self._encode_audio_and_send(frame.audio, frame.sample_rate, done=False)
elif isinstance(frame, TTSStoppedFrame):
await self._encode_audio_and_send(b"\x00", 16000, done=True)
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
elif isinstance(frame, StartInterruptionFrame):
await self._send_interrupt_message()
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._end_conversation()
else:
await self.push_frame(frame, direction)
async def _send_interrupt_message(self) -> None:
transport_frame = TransportMessageUrgentFrame(
message={
"message_type": "conversation",
"event_type": "conversation.interrupt",
"conversation_id": self._conversation_id,
}
)
await self.push_frame(transport_frame)
async def _send_audio_message(self, audio_base64: str, done: bool) -> None:
transport_frame = TransportMessageUrgentFrame(
message={
"message_type": "conversation",
"event_type": "conversation.echo",
"conversation_id": self._conversation_id,
"properties": {
"modality": "audio",
"inference_id": self._current_idx_str,
"audio": audio_base64,
"done": done,
},
}
)
await self.push_frame(transport_frame)

View File

@@ -4,11 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict, Optional
import httpx
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.services.openai import OpenAILLMService
@@ -27,50 +24,16 @@ except ModuleNotFoundError as e:
class TogetherLLMService(OpenAILLMService):
"""This class implements inference with Together's Llama 3.1 models"""
class InputParams(BaseModel):
frequency_penalty: Optional[float] = Field(default=None, ge=-2.0, le=2.0)
max_tokens: Optional[int] = Field(default=4096, ge=1)
presence_penalty: Optional[float] = Field(default=None, ge=-2.0, le=2.0)
temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0)
# Note: top_k is currently not supported by the OpenAI client library,
# so top_k is ignore right now.
top_k: Optional[int] = Field(default=None, ge=0)
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
seed: Optional[int] = Field(default=None)
def __init__(
self,
*,
api_key: str,
base_url: str = "https://api.together.xyz/v1",
model: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(api_key=api_key, base_url=base_url, model=model, params=params, **kwargs)
self.set_model_name(model)
self._settings = {
"max_tokens": params.max_tokens,
"frequency_penalty": params.frequency_penalty,
"presence_penalty": params.presence_penalty,
"seed": params.seed,
"temperature": params.temperature,
"top_p": params.top_p,
"extra": params.extra if isinstance(params.extra, dict) else {},
}
def can_generate_metrics(self) -> bool:
return True
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)
def create_client(self, api_key=None, base_url=None, **kwargs):
logger.debug(f"Creating Together.ai client with api {base_url}")
return AsyncOpenAI(
api_key=api_key,
base_url=base_url,
http_client=DefaultAsyncHttpxClient(
limits=httpx.Limits(
max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None
)
),
)
return super().create_client(api_key, base_url, **kwargs)

View File

@@ -39,9 +39,10 @@ class XTTSService(TTSService):
language: Language,
base_url: str,
aiohttp_session: aiohttp.ClientSession,
sample_rate: int = 24000,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"language": self.language_to_service_language(language),
@@ -150,28 +151,30 @@ class XTTSService(TTSService):
async for chunk in r.content.iter_chunked(1024):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
# Append new chunk to the buffer
# Append new chunk to the buffer.
buffer.extend(chunk)
# Check if buffer has enough data for processing
# Check if buffer has enough data for processing.
while (
len(buffer) >= 48000
): # Assuming at least 0.5 seconds of audio data at 24000 Hz
# Process the buffer up to a safe size for resampling
# Process the buffer up to a safe size for resampling.
process_data = buffer[:48000]
# Remove processed data from buffer
# Remove processed data from buffer.
buffer = buffer[48000:]
# Resample the audio from 24000 Hz to 16000 Hz
resampled_audio = resample_audio(bytes(process_data), 24000, 16000)
# XTTS uses 24000 so we need to resample to our desired rate.
resampled_audio = resample_audio(
bytes(process_data), 24000, self._sample_rate
)
# Create the frame with the resampled audio
frame = TTSAudioRawFrame(resampled_audio, 16000, 1)
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
yield frame
# Process any remaining data in the buffer
# Process any remaining data in the buffer.
if len(buffer) > 0:
resampled_audio = resample_audio(bytes(buffer), 24000, 16000)
frame = TTSAudioRawFrame(resampled_audio, 16000, 1)
resampled_audio = resample_audio(bytes(buffer), 24000, self._sample_rate)
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
yield frame
yield TTSStoppedFrame()

View File

View File

@@ -0,0 +1,17 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseNotifier(ABC):
@abstractmethod
async def notify(self):
pass
@abstractmethod
async def wait(self):
pass

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.sync.base_notifier import BaseNotifier
class EventNotifier(BaseNotifier):
def __init__(self):
self._event = asyncio.Event()
async def notify(self):
self._event.set()
async def wait(self):
await self._event.wait()
self._event.clear()

View File

@@ -13,6 +13,7 @@ from typing import List
from loguru import logger
from PIL import Image
from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
@@ -29,8 +30,6 @@ from pipecat.frames.frames import (
SystemFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
@@ -69,71 +68,21 @@ class BaseOutputTransport(FrameProcessor):
self._stopped_event = asyncio.Event()
# Indicates if the bot is currently speaking. This is useful when we
# have an interruption since all the queued messages will be thrown
# away and we would lose the TTSStoppedFrame.
# Indicates if the bot is currently speaking.
self._bot_speaking = False
# Create sink frame task. This is the task that will actually write
# audio or video frames. We write audio/video in a task so we can keep
# generating frames upstream while, for example, the audio is playing.
async def start(self, frame: StartFrame):
self._create_output_tasks()
self._create_sink_tasks()
async def start(self, frame: StartFrame):
# Create camera output queue and task if needed.
if self._params.camera_out_enabled:
self._camera_out_queue = asyncio.Queue()
self._camera_out_task = self.get_event_loop().create_task(
self._camera_out_task_handler()
)
# Create audio output queue and task if needed.
if self._params.audio_out_enabled and self._params.audio_out_is_live:
self._audio_out_queue = asyncio.Queue()
self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler())
async def stop(self, frame: EndFrame):
# Cancel and wait for the camera output task to finish.
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._camera_out_task = None
# Cancel and wait for the audio output task to finish.
if (
self._audio_out_task
and self._params.audio_out_enabled
and self._params.audio_out_is_live
):
self._audio_out_task.cancel()
await self._audio_out_task
self._audio_out_task = None
await self._cancel_output_tasks()
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
# tasks first or not.
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
self._sink_task = None
if self._sink_clock_task:
self._sink_clock_task.cancel()
await self._sink_clock_task
self._sink_clock_task = None
# Cancel and wait for the camera output task to finish.
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._camera_out_task = None
# Cancel and wait for the audio output task to finish.
if self._audio_out_task and (
self._params.audio_out_enabled and self._params.audio_out_is_live
):
self._audio_out_task.cancel()
await self._audio_out_task
self._audio_out_task = None
await self._cancel_sink_tasks()
await self._cancel_output_tasks()
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
@@ -209,19 +158,14 @@ class BaseOutputTransport(FrameProcessor):
return
if isinstance(frame, StartInterruptionFrame):
# Stop sink tasks.
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
# Stop sink clock tasks.
if self._sink_clock_task:
self._sink_clock_task.cancel()
await self._sink_clock_task
# Create sink tasks.
# Cancel sink and output tasks.
await self._cancel_sink_tasks()
await self._cancel_output_tasks()
# Create sink and output tasks.
self._create_output_tasks()
self._create_sink_tasks()
# Let's send a bot stopped speaking if we have to.
if self._bot_speaking:
await self._bot_stopped_speaking()
await self._bot_stopped_speaking()
async def _handle_audio(self, frame: OutputAudioRawFrame):
if not self._params.audio_out_enabled:
@@ -249,6 +193,18 @@ class BaseOutputTransport(FrameProcessor):
else:
await self._sink_queue.put(frame)
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug("Bot started speaking")
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = True
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug("Bot stopped speaking")
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
#
# Sink tasks
#
@@ -260,23 +216,27 @@ class BaseOutputTransport(FrameProcessor):
self._sink_clock_queue = asyncio.PriorityQueue()
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
async def _cancel_sink_tasks(self):
# Stop sink tasks.
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
self._sink_task = None
# Stop sink clock tasks.
if self._sink_clock_task:
self._sink_clock_task.cancel()
await self._sink_clock_task
self._sink_clock_task = None
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, OutputAudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
await self.push_frame(frame)
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
await self._audio_out_queue.put(frame)
elif isinstance(frame, OutputImageRawFrame):
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
elif isinstance(frame, TTSStartedFrame):
await self._bot_started_speaking()
await self.push_frame(frame)
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self.push_frame(frame)
# We will push EndFrame later.
elif not isinstance(frame, EndFrame):
await self.push_frame(frame)
@@ -319,15 +279,32 @@ class BaseOutputTransport(FrameProcessor):
except Exception as e:
logger.exception(f"{self} error processing sink clock queue: {e}")
async def _bot_started_speaking(self):
logger.debug("Bot started speaking")
self._bot_speaking = True
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
#
# Output tasks
#
async def _bot_stopped_speaking(self):
logger.debug("Bot stopped speaking")
self._bot_speaking = False
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
def _create_output_tasks(self):
loop = self.get_event_loop()
# Create camera output queue and task if needed.
if self._params.camera_out_enabled:
self._camera_out_queue = asyncio.Queue()
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
# Create audio output queue and task if needed.
if self._params.audio_out_enabled:
self._audio_out_queue = asyncio.Queue()
self._audio_out_task = loop.create_task(self._audio_out_task_handler())
async def _cancel_output_tasks(self):
# Stop camera output task.
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._camera_out_task = None
# Stop audio output task.
if self._audio_out_task and self._params.audio_out_enabled:
self._audio_out_task.cancel()
await self._audio_out_task
self._audio_out_task = None
#
# Camera out
@@ -408,12 +385,31 @@ class BaseOutputTransport(FrameProcessor):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
async def _audio_out_task_handler(self):
wait_time = (
self._params.vad_analyzer.params.stop_secs
if self._params.vad_analyzer
else VAD_STOP_SECS
)
while True:
try:
frame = await self._audio_out_queue.get()
# If we don't have an audio frame for VAD stop secs we will
# consider the bot is not speaking.
frame = await asyncio.wait_for(self._audio_out_queue.get(), timeout=wait_time)
# Notify the bot started speaking upstream if necessary.
await self._bot_started_speaking()
# Send audio.
await self.write_raw_audio_frames(frame.audio)
await self.push_frame(frame)
# Notify the bot is speaking upstream.
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
# Push frame downstream in case anyone else needs it.
await self.push_frame(frame)
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
except asyncio.CancelledError:
break
except Exception as e:

View File

@@ -30,7 +30,7 @@ class TransportParams(BaseModel):
camera_out_color_format: str = "RGB"
audio_out_enabled: bool = False
audio_out_is_live: bool = False
audio_out_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_in_enabled: bool = False

View File

@@ -6,6 +6,7 @@
import asyncio
import time
import warnings
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Mapping, Optional
@@ -20,7 +21,7 @@ from daily import (
VirtualSpeakerDevice,
)
from loguru import logger
from pydantic.main import BaseModel
from pydantic import BaseModel, model_validator
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
@@ -93,8 +94,8 @@ class DailyDialinSettings(BaseModel):
class DailyTranscriptionSettings(BaseModel):
language: str = "en"
tier: str = "nova"
model: str = "2-conversationalai"
tier: Optional[str] = None
model: str = "nova-2-general"
profanity_filter: bool = True
redact: bool = False
endpointing: bool = True
@@ -102,6 +103,16 @@ class DailyTranscriptionSettings(BaseModel):
includeRawResponse: bool = True
extra: Mapping[str, Any] = {"interim_results": True}
@model_validator(mode="before")
def check_deprecated_fields(cls, values):
with warnings.catch_warnings():
warnings.simplefilter("always")
if "tier" in values:
warnings.warn(
"Field 'tier' is deprecated, use 'model' instead.", DeprecationWarning
)
return values
class DailyParams(TransportParams):
api_url: str = "https://api.daily.co/v1"
@@ -127,6 +138,10 @@ class DailyCallbacks(BaseModel):
on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]]
on_participant_updated: Callable[[Mapping[str, Any]], Awaitable[None]]
on_transcription_message: Callable[[Mapping[str, Any]], Awaitable[None]]
on_recording_started: Callable[[Mapping[str, Any]], Awaitable[None]]
on_recording_stopped: Callable[[str], Awaitable[None]]
on_recording_error: Callable[[str, str], Awaitable[None]]
def completion_callback(future):
@@ -176,7 +191,8 @@ class DailyTransportClient(EventHandler):
self._participant_id: str = ""
self._video_renderers = {}
self._transcription_renderers = {}
self._transcription_ids = []
self._transcription_status = None
self._other_participant_has_joined = False
self._joined = False
@@ -332,6 +348,7 @@ class DailyTransportClient(EventHandler):
error = await future
if error:
logger.error(f"Unable to start transcription: {error}")
return
async def _join(self):
future = self._loop.create_future()
@@ -440,25 +457,37 @@ class DailyTransportClient(EventHandler):
def participant_counts(self):
return self._client.participant_counts()
def start_dialout(self, settings):
self._client.start_dialout(settings)
async def start_dialout(self, settings):
future = self._loop.create_future()
self._client.start_dialout(settings, completion=completion_callback(future))
await future
def stop_dialout(self, participant_id):
self._client.stop_dialout(participant_id)
async def stop_dialout(self, participant_id):
future = self._loop.create_future()
self._client.stop_dialout(participant_id, completion=completion_callback(future))
await future
def start_recording(self, streaming_settings, stream_id, force_new):
self._client.start_recording(streaming_settings, stream_id, force_new)
async def start_recording(self, streaming_settings, stream_id, force_new):
future = self._loop.create_future()
self._client.start_recording(
streaming_settings, stream_id, force_new, completion=completion_callback(future)
)
await future
def stop_recording(self, stream_id):
self._client.stop_recording(stream_id)
async def stop_recording(self, stream_id):
future = self._loop.create_future()
self._client.stop_recording(stream_id, completion=completion_callback(future))
await future
def capture_participant_transcription(self, participant_id: str, callback: Callable):
async def capture_participant_transcription(self, participant_id: str):
if not self._params.transcription_enabled:
return
self._transcription_renderers[participant_id] = callback
self._transcription_ids.append(participant_id)
if self._joined and self._transcription_status:
await self.update_transcription(self._transcription_ids)
def capture_participant_video(
async def capture_participant_video(
self,
participant_id: str,
callback: Callable,
@@ -466,9 +495,12 @@ class DailyTransportClient(EventHandler):
video_source: str = "camera",
color_format: str = "RGB",
):
# Only enable camera subscription on this participant
self._client.update_subscriptions(
participant_settings={participant_id: {"media": "subscribed"}}
# Try to enable camera and screen subscription on this participant
await self.update_subscriptions(
# participant_settings={participant_id: {"media": "subscribed"}}
participant_settings={
participant_id: {"media": {"camera": "subscribed", "screenVideo": "subscribed"}}
}
)
self._video_renderers[participant_id] = callback
@@ -480,6 +512,22 @@ class DailyTransportClient(EventHandler):
color_format=color_format,
)
async def update_transcription(self, participants=None, instance_id=None):
future = self._loop.create_future()
self._client.update_transcription(
participants, instance_id, completion=completion_callback(future)
)
await future
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
future = self._loop.create_future()
self._client.update_subscriptions(
participant_settings=participant_settings,
profile_settings=profile_settings,
completion=completion_callback(future),
)
await future
#
#
# Daily (EventHandler)
@@ -528,23 +576,31 @@ class DailyTransportClient(EventHandler):
def on_participant_updated(self, participant):
self._call_async_callback(self._callbacks.on_participant_updated, participant)
def on_transcription_message(self, message: Mapping[str, Any]):
participant_id = ""
if "participantId" in message:
participant_id = message["participantId"]
def on_transcription_started(self, status):
logger.debug(f"Transcription started: {status}")
self._transcription_status = status
self._call_async_callback(self.update_transcription, self._transcription_ids)
if participant_id in self._transcription_renderers:
callback = self._transcription_renderers[participant_id]
self._call_async_callback(callback, participant_id, message)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
logger.debug("Transcription stopped")
def on_transcription_error(self, message):
logger.error(f"Transcription error: {message}")
def on_transcription_started(self, status):
logger.debug(f"Transcription started: {status}")
def on_transcription_message(self, message):
self._call_async_callback(self._callbacks.on_transcription_message, message)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
logger.debug("Transcription stopped")
def on_recording_started(self, status):
logger.debug(f"Recording started: {status}")
self._call_async_callback(self._callbacks.on_recording_started, status)
def on_recording_stopped(self, stream_id):
logger.debug(f"Recording stopped: {stream_id}")
self._call_async_callback(self._callbacks.on_recording_stopped, stream_id)
def on_recording_error(self, stream_id, message):
logger.error(f"Recording error for {stream_id}: {message}")
self._call_async_callback(self._callbacks.on_recording_error, stream_id, message)
#
# Daily (CallClient callbacks)
@@ -561,8 +617,15 @@ class DailyTransportClient(EventHandler):
)
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
future.result()
# Don't wait on the coroutine, otherwise if we call a `CallClient`
# function and wait for its completion this will currently result in a
# deadlock. This is because `_call_async_callback` is used inside
# `CallClient` event handlers which are holding the GIL in
# `daily-python`. So if the `callback` passed here makes a `CallClient`
# call and waits for it to finish using completions (and a future) we
# will deadlock because completions use event handlers (which are
# holding the GIL).
asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
class DailyInputTransport(BaseInputTransport):
@@ -631,7 +694,7 @@ class DailyInputTransport(BaseInputTransport):
await super().process_frame(frame, direction)
if isinstance(frame, UserImageRequestFrame):
self.request_participant_image(frame.user_id)
await self.request_participant_image(frame.user_id)
#
# Frames
@@ -661,7 +724,7 @@ class DailyInputTransport(BaseInputTransport):
# Camera in
#
def capture_participant_video(
async def capture_participant_video(
self,
participant_id: str,
framerate: int = 30,
@@ -674,11 +737,11 @@ class DailyInputTransport(BaseInputTransport):
"render_next_frame": False,
}
self._client.capture_participant_video(
await self._client.capture_participant_video(
participant_id, self._on_participant_video_frame, framerate, video_source, color_format
)
def request_participant_image(self, participant_id: str):
async def request_participant_image(self, participant_id: str):
if participant_id in self._video_renderers:
self._video_renderers[participant_id]["render_next_frame"] = True
@@ -711,37 +774,21 @@ class DailyOutputTransport(BaseOutputTransport):
self._client = client
# Task to process outgoing messages.
self._messages_task = None
self._messages_queue = asyncio.Queue()
async def start(self, frame: StartFrame):
# Parent start.
await super().start(frame)
# Join the room.
await self._client.join()
# Start messages task
self._messages_task = self.get_event_loop().create_task(self._messages_task_handler())
async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Cancel messages task
if self._messages_task:
self._messages_task.cancel()
await self._messages_task
self._messages_task = None
# Leave the room.
await self._client.leave()
async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Cancel messages task
if self._messages_task:
self._messages_task.cancel()
await self._messages_task
self._messages_task = None
# Leave the room.
await self._client.leave()
@@ -750,7 +797,7 @@ class DailyOutputTransport(BaseOutputTransport):
await self._client.cleanup()
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._messages_queue.put(frame)
await self._client.send_message(frame)
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)
@@ -758,17 +805,6 @@ class DailyOutputTransport(BaseOutputTransport):
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
await self._client.write_frame_to_camera(frame)
async def _messages_task_handler(self):
while True:
try:
message = await self._messages_queue.get()
await self._client.send_message(message)
self._messages_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error processing message queue: {e}")
class DailyTransport(BaseTransport):
def __init__(
@@ -799,6 +835,10 @@ class DailyTransport(BaseTransport):
on_participant_joined=self._on_participant_joined,
on_participant_left=self._on_participant_left,
on_participant_updated=self._on_participant_updated,
on_transcription_message=self._on_transcription_message,
on_recording_started=self._on_recording_started,
on_recording_stopped=self._on_recording_stopped,
on_recording_error=self._on_recording_error,
)
self._params = params
@@ -824,6 +864,10 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_participant_updated")
self._register_event_handler("on_transcription_message")
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
#
# BaseTransport
@@ -861,24 +905,22 @@ class DailyTransport(BaseTransport):
def participant_counts(self):
return self._client.participant_counts()
def start_dialout(self, settings=None):
self._client.start_dialout(settings)
async def start_dialout(self, settings=None):
await self._client.start_dialout(settings)
def stop_dialout(self, participant_id):
self._client.stop_dialout(participant_id)
async def stop_dialout(self, participant_id):
await self._client.stop_dialout(participant_id)
def start_recording(self, streaming_settings=None, stream_id=None, force_new=None):
self._client.start_recording(streaming_settings, stream_id, force_new)
async def start_recording(self, streaming_settings=None, stream_id=None, force_new=None):
await self._client.start_recording(streaming_settings, stream_id, force_new)
def stop_recording(self, stream_id=None):
self._client.stop_recording(stream_id)
async def stop_recording(self, stream_id=None):
await self._client.stop_recording(stream_id)
def capture_participant_transcription(self, participant_id: str):
self._client.capture_participant_transcription(
participant_id, self._on_transcription_message
)
async def capture_participant_transcription(self, participant_id: str):
await self._client.capture_participant_transcription(participant_id)
def capture_participant_video(
async def capture_participant_video(
self,
participant_id: str,
framerate: int = 30,
@@ -886,10 +928,15 @@ class DailyTransport(BaseTransport):
color_format: str = "RGB",
):
if self._input:
self._input.capture_participant_video(
await self._input.capture_participant_video(
participant_id, framerate, video_source, color_format
)
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
await self._client.update_subscriptions(
participant_settings=participant_settings, profile_settings=profile_settings
)
async def _on_joined(self, data):
await self._call_event_handler("on_joined", data)
@@ -927,7 +974,9 @@ class DailyTransport(BaseTransport):
url = f"{self._params.api_url}/dialin/pinlessCallUpdate"
try:
async with session.post(url, headers=headers, json=data, timeout=10) as r:
async with session.post(
url, headers=headers, json=data, timeout=aiohttp.ClientTimeout(total=10)
) as r:
if r.status != 200:
text = await r.text()
logger.error(
@@ -973,7 +1022,13 @@ class DailyTransport(BaseTransport):
async def _on_first_participant_joined(self, participant):
await self._call_event_handler("on_first_participant_joined", participant)
async def _on_transcription_message(self, participant_id, message):
async def _on_transcription_message(self, message):
participant_id = ""
if "participantId" in message:
participant_id = message["participantId"]
if not participant_id:
return
text = message["text"]
timestamp = message["timestamp"]
is_final = message["rawResponse"]["is_final"]
@@ -990,3 +1045,12 @@ class DailyTransport(BaseTransport):
if self._input:
await self._input.push_transcription_frame(frame)
async def _on_recording_started(self, status):
await self._call_event_handler("on_recording_started", status)
async def _on_recording_stopped(self, stream_id):
await self._call_event_handler("on_recording_stopped", stream_id)
async def _on_recording_error(self, stream_id, message):
await self._call_event_handler("on_recording_error", stream_id, message)

View File

@@ -11,6 +11,7 @@ from typing import Any, Awaitable, Callable, List
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,

View File

@@ -4,8 +4,13 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import warnings
logger.warning("DEPRECATED: Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead.")
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead", DeprecationWarning
)
from ..audio.vad.silero import SileroVAD, SileroVADAnalyzer
from ..audio.vad.silero import SileroVADAnalyzer
from ..processors.audio.vad.silero import SileroVAD

View File

@@ -4,8 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import warnings
logger.warning("DEPRECATED: Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead.")
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead", DeprecationWarning
)
from ..audio.vad.vad_analyzer import VADAnalyzer, VADParams, VADState

View File

@@ -22,7 +22,7 @@ pydantic~=2.8.2
pyloudnorm~=0.1.1
pyht~=0.1.4
python-dotenv~=1.0.1
scipy~=1.14.1
resampy~=0.4.3
silero-vad~=5.1
together~=1.2.7
transformers~=4.44.0