Compare commits
142 Commits
khk/load-j
...
v0.0.48
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d4be0139a | ||
|
|
f58c3ee322 | ||
|
|
379750df91 | ||
|
|
d125a38737 | ||
|
|
446bb0aeaf | ||
|
|
d839080834 | ||
|
|
9b85d0642b | ||
|
|
230b51a117 | ||
|
|
3a965ca396 | ||
|
|
33fc5bf990 | ||
|
|
a54ca08405 | ||
|
|
4379db43ed | ||
|
|
e915c676aa | ||
|
|
e0a003afa1 | ||
|
|
d5666727ce | ||
|
|
f6d7402530 | ||
|
|
aefe190c9f | ||
|
|
29925a8f21 | ||
|
|
beb3271168 | ||
|
|
b959ac6e1e | ||
|
|
17f4286942 | ||
|
|
ce89bbb16e | ||
|
|
865768039b | ||
|
|
7071482583 | ||
|
|
5353d13151 | ||
|
|
a9e565f355 | ||
|
|
b6f0c16591 | ||
|
|
49005d02f5 | ||
|
|
6d8b885071 | ||
|
|
2eccb33e73 | ||
|
|
22ca4c5a02 | ||
|
|
84f26ac1ca | ||
|
|
74937411e6 | ||
|
|
8aab068ffd | ||
|
|
bd50201ce4 | ||
|
|
6082da284e | ||
|
|
358c458265 | ||
|
|
807dbbe326 | ||
|
|
3c116b291d | ||
|
|
0dd413ee90 | ||
|
|
abc8ede3d7 | ||
|
|
126324ca1b | ||
|
|
602915ae18 | ||
|
|
0ac9e2dd3f | ||
|
|
a9ef5ca95d | ||
|
|
81c476dd4c | ||
|
|
151242d3a0 | ||
|
|
93c6e5098c | ||
|
|
4455b2a428 | ||
|
|
94062592ef | ||
|
|
d2401a76c8 | ||
|
|
e2b1b56e86 | ||
|
|
84bd767312 | ||
|
|
802c29e9e1 | ||
|
|
f83381860c | ||
|
|
4dad1bfe49 | ||
|
|
9ee8896b64 | ||
|
|
5f7a2f66d4 | ||
|
|
76e5f1e847 | ||
|
|
6975340d6c | ||
|
|
0f4cf56418 | ||
|
|
018e51e8a3 | ||
|
|
b050143952 | ||
|
|
98ea1f0791 | ||
|
|
8272c35527 | ||
|
|
e973e82e05 | ||
|
|
d1396bf618 | ||
|
|
8186e423de | ||
|
|
3010addb8b | ||
|
|
029e0d391e | ||
|
|
bf31223577 | ||
|
|
42cc79154f | ||
|
|
05b857006a | ||
|
|
2e57d21b89 | ||
|
|
fa05ec46be | ||
|
|
e3ce619284 | ||
|
|
fb512dcd74 | ||
|
|
ca15d97383 | ||
|
|
b32448e967 | ||
|
|
7e30da6183 | ||
|
|
a6dd2600d2 | ||
|
|
b905b57dfc | ||
|
|
e1a7edfb58 | ||
|
|
1b30b1fc23 | ||
|
|
55026898f6 | ||
|
|
4283557894 | ||
|
|
5ab00e01aa | ||
|
|
fcfc729e83 | ||
|
|
4eacb34fd8 | ||
|
|
3a8aacccf7 | ||
|
|
54c0bf0c70 | ||
|
|
778b05a252 | ||
|
|
f16a416c2b | ||
|
|
1be63bccb8 | ||
|
|
37820ac0df | ||
|
|
8ea80d43f4 | ||
|
|
e117d70a00 | ||
|
|
2ba753272a | ||
|
|
60c8c2f6e9 | ||
|
|
cfb48200c2 | ||
|
|
6d317c6e8e | ||
|
|
158d52856f | ||
|
|
92a69e404f | ||
|
|
d24c6185d8 | ||
|
|
1fd21578a6 | ||
|
|
700db87127 | ||
|
|
6f1310569c | ||
|
|
14cedb0be8 | ||
|
|
fae97f9051 | ||
|
|
d930a46e64 | ||
|
|
2e6b5d1843 | ||
|
|
88362db034 | ||
|
|
f7f0c44c32 | ||
|
|
33553b71d4 | ||
|
|
be8ca505cd | ||
|
|
e957cce422 | ||
|
|
418a13a4ec | ||
|
|
fc445c0a1f | ||
|
|
f0c65468ed | ||
|
|
ce6a2bdcf7 | ||
|
|
673542e235 | ||
|
|
e032b0b70a | ||
|
|
e39f7e965b | ||
|
|
d26751e968 | ||
|
|
e0ca4a9c23 | ||
|
|
801e52c095 | ||
|
|
a46eaa838b | ||
|
|
7c432499db | ||
|
|
8d75fcc9f0 | ||
|
|
61d73f81ae | ||
|
|
951255def9 | ||
|
|
bf5a7c3562 | ||
|
|
e556f34094 | ||
|
|
ccc3691620 | ||
|
|
5321affda7 | ||
|
|
e5ad8dc67b | ||
|
|
46927805bc | ||
|
|
07712cdb16 | ||
|
|
b999b76f70 | ||
|
|
0e69625a01 | ||
|
|
4e0823fced | ||
|
|
40af3571f0 |
2
.github/workflows/format.yaml
vendored
2
.github/workflows/format.yaml
vendored
@@ -38,4 +38,4 @@ jobs:
|
||||
id: ruff
|
||||
run: |
|
||||
source .venv/bin/activate
|
||||
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
|
||||
ruff format --diff
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -4,6 +4,7 @@ __pycache__/
|
||||
*~
|
||||
venv
|
||||
.venv
|
||||
/.idea
|
||||
#*#
|
||||
|
||||
# Distribution / packaging
|
||||
|
||||
103
CHANGELOG.md
103
CHANGELOG.md
@@ -5,7 +5,97 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.48] - 2024-11-10 "Antonio release"
|
||||
|
||||
### Added
|
||||
|
||||
- There's now an input queue in each frame processor. When you call
|
||||
`FrameProcessor.push_frame()` this will internally call
|
||||
`FrameProcessor.queue_frame()` on the next processor (upstream or downstream)
|
||||
and the frame will be internally queued (except system frames). Then, the
|
||||
queued frames will get processed. With this input queue it is also possible
|
||||
for FrameProcessors to block processing more frames by calling
|
||||
`FrameProcessor.pause_processing_frames()`. The way to resume processing
|
||||
frames is by calling `FrameProcessor.resume_processing_frames()`.
|
||||
|
||||
- Added audio filter `NoisereduceFilter`.
|
||||
|
||||
- Introduce input transport audio filters (`BaseAudioFilter`). Audio filters can
|
||||
be used to remove background noises before audio is sent to VAD.
|
||||
|
||||
- Introduce output transport audio mixers (`BaseAudioMixer`). Output transport
|
||||
audio mixers can be used, for example, to add background sounds or any other
|
||||
audio mixing functionality before the output audio is actually written to the
|
||||
transport.
|
||||
|
||||
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
|
||||
received OpenAI LLM context frame and it doesn't let it through until the
|
||||
notifier is notified.
|
||||
|
||||
- Added `WakeNotifierFilter`. This processor expects a list of frame types and
|
||||
will execute a given callback predicate when a frame of any of those type is
|
||||
being processed. If the callback returns true the notifier will be notified.
|
||||
|
||||
- Added `NullFilter`. A null filter doesn't push any frames upstream or
|
||||
downstream. This is usually used to disable one of the pipelines in
|
||||
`ParallelPipeline`.
|
||||
|
||||
- Added `EventNotifier`. This can be used as a very simple synchronization
|
||||
feature between processors.
|
||||
|
||||
- Added `TavusVideoService`. This is an integration for Tavus digital twins.
|
||||
(see https://www.tavus.io/)
|
||||
|
||||
- Added `DailyTransport.update_subscriptions()`. This allows you to have fine
|
||||
grained control of what media subscriptions you want for each participant in a
|
||||
room.
|
||||
|
||||
- Added audio filter `KrispFilter`.
|
||||
|
||||
### Changed
|
||||
|
||||
- The following `DailyTransport` functions are now `async` which means they need
|
||||
to be awaited: `start_dialout`, `stop_dialout`, `start_recording`,
|
||||
`stop_recording`, `capture_participant_transcription` and
|
||||
`capture_participant_video`.
|
||||
|
||||
- Changed default output sample rate to 24000. This changes all TTS service to
|
||||
output to 24000 and also the default output transport sample rate. This
|
||||
improves audio quality at the cost of some extra bandwidth.
|
||||
|
||||
- `AzureTTSService` now uses Azure websockets instead of HTTP requests.
|
||||
|
||||
- The previous `AzureTTSService` HTTP implementation is now
|
||||
`AzureHttpTTSService`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Websocket transports (FastAPI and Websocket) now synchronize with time before
|
||||
sending data. This allows for interruptions to just work out of the box.
|
||||
|
||||
- Improved bot speaking detection for all TTS services by using actual bot
|
||||
audio.
|
||||
|
||||
- Fixed an issue that was generating constant bot started/stopped speaking
|
||||
frames for HTTP TTS services.
|
||||
|
||||
- Fixed an issue that was causing stuttering with AWS TTS service.
|
||||
|
||||
- Fixed an issue with PlayHTTTSService, where the TTFB metrics were reporting
|
||||
very small time values.
|
||||
|
||||
- Fixed an issue where AzureTTSService wasn't initializing the specified
|
||||
language.
|
||||
|
||||
### Other
|
||||
|
||||
- Add `23-bot-background-sound.py` foundational example.
|
||||
|
||||
- Added a new foundational example `22-natural-conversation.py`. This example
|
||||
shows how to achieve a more natural conversation detecting when the user ends
|
||||
statement.
|
||||
|
||||
## [0.0.47] - 2024-10-22
|
||||
|
||||
### Added
|
||||
|
||||
@@ -15,8 +105,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added a foundational example for Gladia transcription:
|
||||
`13c-gladia-transcription.py`
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated `GladiaSTTService` to use the V2 API.
|
||||
|
||||
- Changed `DailyTransport` transcription model to `nova-2-general`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue that would cause an import error when importing
|
||||
`SileroVADAnalyzer` from the old package `pipecat.vad.silero`.
|
||||
|
||||
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
|
||||
from `enable_metrics`.
|
||||
|
||||
@@ -32,6 +131,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed `DeepgramSTTService` model to `nova-2-general`.
|
||||
|
||||
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
|
||||
|
||||
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has
|
||||
|
||||
27
README.md
27
README.md
@@ -64,7 +64,7 @@ async def main():
|
||||
# Use Daily as a real-time media transport (WebRTC)
|
||||
transport = DailyTransport(
|
||||
room_url=...,
|
||||
token=...,
|
||||
token="", # leave empty. Note: token is _not_ your api key
|
||||
bot_name="Bot Name",
|
||||
params=DailyParams(audio_out_enabled=True))
|
||||
|
||||
@@ -129,6 +129,24 @@ Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer.
|
||||
pip install pipecat-ai[silero]
|
||||
```
|
||||
|
||||
## Running the Krisp Audio Filter
|
||||
|
||||
To use the Krisp Filter in this project, you’ll need access to the **Krisp C++ SDK**.
|
||||
|
||||
### Step 1: Obtain Access to the Krisp SDK
|
||||
1. **Create a Krisp Account**: If you don’t already have an account, [sign up at Krisp](https://krisp.ai/) to access the SDK.
|
||||
2. **Download the SDK**: Once you have an account, follow the instructions on the Krisp platform to download the [Krisp's desktop SDKs](https://sdk.krisp.ai/sdk/desktop).
|
||||
3. **Export the path to you krisp SDK**:
|
||||
`export KRISP_SDK_PATH=/PATH/TO/KRISP/SDK`
|
||||
|
||||
### Step 2: Install the `pipecat-krisp` Module
|
||||
Once the environment variable `KRISP_SDK_PATH` is exported, activate your Python virtual environment and install it with `pip`:
|
||||
|
||||
```shell
|
||||
source venv/bin/activate
|
||||
pip install pipecat-ai[krisp]
|
||||
```
|
||||
|
||||
## Hacking on the framework itself
|
||||
|
||||
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
|
||||
@@ -178,7 +196,7 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
|
||||
:ensure t
|
||||
:hook ((python-mode . lazy-ruff-mode))
|
||||
:config
|
||||
(setq lazy-ruff-format-command "ruff format --config line-length=100")
|
||||
(setq lazy-ruff-format-command "ruff format")
|
||||
(setq lazy-ruff-only-format-block t)
|
||||
(setq lazy-ruff-only-format-region t)
|
||||
(setq lazy-ruff-only-format-buffer t))
|
||||
@@ -197,14 +215,13 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
|
||||
### Visual Studio Code
|
||||
|
||||
Install the
|
||||
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
|
||||
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
|
||||
|
||||
```json
|
||||
"[python]": {
|
||||
"editor.defaultFormatter": "charliermarsh.ruff",
|
||||
"editor.formatOnSave": true
|
||||
},
|
||||
"ruff.format.args": ["--config", "line-length=100"]
|
||||
}
|
||||
```
|
||||
|
||||
## Getting help
|
||||
|
||||
165
docs/CONTRIBUTING.md
Normal file
165
docs/CONTRIBUTING.md
Normal file
@@ -0,0 +1,165 @@
|
||||
## Contributing to Pipecat
|
||||
|
||||
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
|
||||
|
||||
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
|
||||
|
||||
2. **Clone the repository**: Clone your forked repository to your local machine.
|
||||
```bash
|
||||
git clone https://github.com/your-username/pipecat
|
||||
```
|
||||
3. **Create a branch**: For your contribution, create a new branch.
|
||||
```bash
|
||||
git checkout -b your-branch-name
|
||||
```
|
||||
4. **Make your changes**: Edit or add files as necessary.
|
||||
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
|
||||
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
|
||||
|
||||
```bash
|
||||
git commit -m "Description of your changes"
|
||||
```
|
||||
|
||||
7. **Push your changes**: Push your branch to your forked repository.
|
||||
|
||||
```bash
|
||||
git push origin your-branch-name
|
||||
```
|
||||
|
||||
9. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
|
||||
> Important: Describe the changes you've made clearly!
|
||||
|
||||
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
|
||||
|
||||
|
||||
# Contributor Covenant Code of Conduct
|
||||
|
||||
## Our Pledge
|
||||
|
||||
We as members, contributors, and leaders pledge to make participation in our
|
||||
community a harassment-free experience for everyone, regardless of age, body
|
||||
size, visible or invisible disability, ethnicity, sex characteristics, gender
|
||||
identity and expression, level of experience, education, socio-economic status,
|
||||
nationality, personal appearance, race, caste, color, religion, or sexual
|
||||
identity and orientation.
|
||||
|
||||
We pledge to act and interact in ways that contribute to an open, welcoming,
|
||||
diverse, inclusive, and healthy community.
|
||||
|
||||
## Our Standards
|
||||
|
||||
Examples of behavior that contributes to a positive environment for our
|
||||
community include:
|
||||
|
||||
* Demonstrating empathy and kindness toward other people
|
||||
* Being respectful of differing opinions, viewpoints, and experiences
|
||||
* Giving and gracefully accepting constructive feedback
|
||||
* Accepting responsibility and apologizing to those affected by our mistakes,
|
||||
and learning from the experience
|
||||
* Focusing on what is best not just for us as individuals, but for the overall
|
||||
community
|
||||
|
||||
Examples of unacceptable behavior include:
|
||||
|
||||
* The use of sexualized language or imagery, and sexual attention or advances of
|
||||
any kind
|
||||
* Trolling, insulting or derogatory comments, and personal or political attacks
|
||||
* Public or private harassment
|
||||
* Publishing others' private information, such as a physical or email address,
|
||||
without their explicit permission
|
||||
* Other conduct which could reasonably be considered inappropriate in a
|
||||
professional setting
|
||||
|
||||
## Enforcement Responsibilities
|
||||
|
||||
Community leaders are responsible for clarifying and enforcing our standards of
|
||||
acceptable behavior and will take appropriate and fair corrective action in
|
||||
response to any behavior that they deem inappropriate, threatening, offensive,
|
||||
or harmful.
|
||||
|
||||
Community leaders have the right and responsibility to remove, edit, or reject
|
||||
comments, commits, code, wiki edits, issues, and other contributions that are
|
||||
not aligned to this Code of Conduct, and will communicate reasons for moderation
|
||||
decisions when appropriate.
|
||||
|
||||
## Scope
|
||||
|
||||
This Code of Conduct applies within all community spaces, and also applies when
|
||||
an individual is officially representing the community in public spaces.
|
||||
Examples of representing our community include using an official email address,
|
||||
posting via an official social media account, or acting as an appointed
|
||||
representative at an online or offline event.
|
||||
|
||||
## Enforcement
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be
|
||||
reported to the community leaders responsible for enforcement at pipecat-ai@daily.co.
|
||||
All complaints will be reviewed and investigated promptly and fairly.
|
||||
|
||||
All community leaders are obligated to respect the privacy and security of the
|
||||
reporter of any incident.
|
||||
|
||||
## Enforcement Guidelines
|
||||
|
||||
Community leaders will follow these Community Impact Guidelines in determining
|
||||
the consequences for any action they deem in violation of this Code of Conduct:
|
||||
|
||||
### 1. Correction
|
||||
|
||||
**Community Impact**: Use of inappropriate language or other behavior deemed
|
||||
unprofessional or unwelcome in the community.
|
||||
|
||||
**Consequence**: A private, written warning from community leaders, providing
|
||||
clarity around the nature of the violation and an explanation of why the
|
||||
behavior was inappropriate. A public apology may be requested.
|
||||
|
||||
### 2. Warning
|
||||
|
||||
**Community Impact**: A violation through a single incident or series of
|
||||
actions.
|
||||
|
||||
**Consequence**: A warning with consequences for continued behavior. No
|
||||
interaction with the people involved, including unsolicited interaction with
|
||||
those enforcing the Code of Conduct, for a specified period of time. This
|
||||
includes avoiding interactions in community spaces as well as external channels
|
||||
like social media. Violating these terms may lead to a temporary or permanent
|
||||
ban.
|
||||
|
||||
### 3. Temporary Ban
|
||||
|
||||
**Community Impact**: A serious violation of community standards, including
|
||||
sustained inappropriate behavior.
|
||||
|
||||
**Consequence**: A temporary ban from any sort of interaction or public
|
||||
communication with the community for a specified period of time. No public or
|
||||
private interaction with the people involved, including unsolicited interaction
|
||||
with those enforcing the Code of Conduct, is allowed during this period.
|
||||
Violating these terms may lead to a permanent ban.
|
||||
|
||||
### 4. Permanent Ban
|
||||
|
||||
**Community Impact**: Demonstrating a pattern of violation of community
|
||||
standards, including sustained inappropriate behavior, harassment of an
|
||||
individual, or aggression toward or disparagement of classes of individuals.
|
||||
|
||||
**Consequence**: A permanent ban from any sort of public interaction within the
|
||||
community.
|
||||
|
||||
## Attribution
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
|
||||
version 2.1, available at
|
||||
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
|
||||
|
||||
Community Impact Guidelines were inspired by
|
||||
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
|
||||
|
||||
For answers to common questions about this code of conduct, see the FAQ at
|
||||
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
|
||||
[https://www.contributor-covenant.org/translations][translations].
|
||||
|
||||
[homepage]: https://www.contributor-covenant.org
|
||||
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
|
||||
[Mozilla CoC]: https://github.com/mozilla/diversity
|
||||
[FAQ]: https://www.contributor-covenant.org/faq
|
||||
[translations]: https://www.contributor-covenant.org/translations
|
||||
22
docs/ISSUE_TEMPLATE.md
Normal file
22
docs/ISSUE_TEMPLATE.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Description
|
||||
Is this reporting a bug or feature request?
|
||||
|
||||
|
||||
If reporting a bug, please fill out the following:
|
||||
|
||||
### Environment
|
||||
- pipecat-ai version:
|
||||
- python version:
|
||||
- OS:
|
||||
|
||||
### Issue description
|
||||
Provide a clear description of the issue.
|
||||
|
||||
### Repro steps
|
||||
List the steps to reproduce the issue.
|
||||
|
||||
### Expected behavior
|
||||
|
||||
### Actual behavior
|
||||
|
||||
### Logs
|
||||
1
docs/PULL_REQUEST_TEMPLATE.md
Normal file
1
docs/PULL_REQUEST_TEMPLATE.md
Normal file
@@ -0,0 +1 @@
|
||||
#### Please describe the changes in your PR. If it is addressing an issue, please reference that as well.
|
||||
113
docs/frame.md
Normal file
113
docs/frame.md
Normal file
@@ -0,0 +1,113 @@
|
||||
# Understanding Different Frame Types in the Pipecat System
|
||||
|
||||
In the Pipecat system, frames are used to represent different types of data and control signals that flow through the pipeline. Understanding these frame types is crucial for working with the system effectively. This tutorial will cover the main categories of frames and their specific uses.
|
||||
|
||||
## 1. Base Frame Classes
|
||||
|
||||
### Frame
|
||||
The `Frame` class is the base class for all frames. It includes:
|
||||
- `id`: A unique identifier
|
||||
- `name`: A descriptive name
|
||||
- `pts`: Presentation timestamp (optional)
|
||||
|
||||
### DataFrame
|
||||
`DataFrame` is a subclass of `Frame` and serves as a base for most data-carrying frames.
|
||||
|
||||
## 2. Audio Frames
|
||||
|
||||
### AudioRawFrame
|
||||
Represents a chunk of audio with properties:
|
||||
- `audio`: Raw audio data
|
||||
- `sample_rate`: Audio sample rate
|
||||
- `num_channels`: Number of audio channels
|
||||
|
||||
Subclasses include:
|
||||
- `InputAudioRawFrame`: For audio from input sources
|
||||
- `OutputAudioRawFrame`: For audio to be played by output devices
|
||||
- `TTSAudioRawFrame`: For audio generated by Text-to-Speech services
|
||||
|
||||
## 3. Image Frames
|
||||
|
||||
### ImageRawFrame
|
||||
Represents an image with properties:
|
||||
- `image`: Raw image data
|
||||
- `size`: Image dimensions
|
||||
- `format`: Image format (e.g., JPEG, PNG)
|
||||
|
||||
Subclasses include:
|
||||
- `InputImageRawFrame`: For images from input sources
|
||||
- `OutputImageRawFrame`: For images to be displayed
|
||||
- `UserImageRawFrame`: For images associated with a specific user
|
||||
- `VisionImageRawFrame`: For images with associated text for description
|
||||
- `URLImageRawFrame`: For images with an associated URL
|
||||
|
||||
### SpriteFrame
|
||||
Represents an animated sprite, containing a list of `ImageRawFrame` objects.
|
||||
|
||||
## 4. Text and Transcription Frames
|
||||
|
||||
### TextFrame
|
||||
Represents a chunk of text, used for various purposes in the pipeline.
|
||||
|
||||
### TranscriptionFrame
|
||||
A specialized `TextFrame` for speech transcriptions, including:
|
||||
- `user_id`: ID of the speaking user
|
||||
- `timestamp`: When the transcription was generated
|
||||
- `language`: Detected language of the speech
|
||||
|
||||
### InterimTranscriptionFrame
|
||||
Similar to `TranscriptionFrame`, but for interim (not final) transcriptions.
|
||||
|
||||
## 5. LLM (Language Model) Frames
|
||||
|
||||
### LLMMessagesFrame
|
||||
Contains a list of messages for an LLM service to process.
|
||||
|
||||
### LLMMessagesAppendFrame and LLMMessagesUpdateFrame
|
||||
Used to modify the current context of LLM messages.
|
||||
|
||||
### LLMSetToolsFrame
|
||||
Specifies tools (functions) available for the LLM to use.
|
||||
|
||||
### LLMEnablePromptCachingFrame
|
||||
Controls prompt caching in certain LLMs.
|
||||
|
||||
## 6. System and Control Frames
|
||||
|
||||
### SystemFrame
|
||||
Base class for system-level frames.
|
||||
|
||||
Important system frames include:
|
||||
- `StartFrame`: Initiates a pipeline
|
||||
- `CancelFrame`: Stops a pipeline immediately
|
||||
- `ErrorFrame`: Notifies of errors (with `FatalErrorFrame` for unrecoverable errors)
|
||||
- `EndTaskFrame` and `CancelTaskFrame`: Control pipeline tasks
|
||||
- `StartInterruptionFrame` and `StopInterruptionFrame`: Indicate user speech for interruptions
|
||||
|
||||
### ControlFrame
|
||||
Base class for control-flow frames.
|
||||
|
||||
Notable control frames:
|
||||
- `EndFrame`: Signals the end of a pipeline
|
||||
- `LLMFullResponseStartFrame` and `LLMFullResponseEndFrame`: Bracket LLM responses
|
||||
- `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame`: Indicate user speech activity
|
||||
- `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame`: Indicate bot speech activity
|
||||
- `TTSStartedFrame` and `TTSStoppedFrame`: Bracket Text-to-Speech responses
|
||||
|
||||
## 7. Special Purpose Frames
|
||||
|
||||
### AppFrame
|
||||
Base class for application-specific custom frames.
|
||||
|
||||
### MetricsFrame
|
||||
Contains performance metrics data.
|
||||
|
||||
### FunctionCallInProgressFrame and FunctionCallResultFrame
|
||||
Used for handling LLM function (tool) calls.
|
||||
|
||||
### ServiceUpdateSettingsFrame
|
||||
Base class for updating service settings, with specific subclasses for LLM, TTS, and STT services.
|
||||
|
||||
## Conclusion
|
||||
|
||||
Understanding these frame types is essential for working with the Pipecat system. Each frame type serves a specific purpose in the pipeline, whether it's carrying data (like audio or images), controlling the flow of the pipeline, or managing system-level operations. By using the appropriate frame types, you can effectively process and transmit various kinds of information through your pipeline.
|
||||
@@ -46,5 +46,13 @@ PLAY_HT_API_KEY=...
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
#OpenPipe
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
TAVUS_PERSONA_ID=...
|
||||
|
||||
#Krisp
|
||||
KRISP_MODEL_PATH=...
|
||||
@@ -1,12 +1,41 @@
|
||||
# Simple Chatbot
|
||||
# Chatbot with canonical-metrics
|
||||
|
||||
<img src="image.png" width="420px">
|
||||
This project implements a chatbot using a pipeline architecture that integrates audio processing, transcription, and a language model for conversational interactions. The chatbot operates within a daily communication environment, utilizing various services for text-to-speech and language model responses.
|
||||
|
||||
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
|
||||
## Features
|
||||
|
||||
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
|
||||
- **Audio Input and Output**: Captures microphone input and plays back audio responses.
|
||||
- **Voice Activity Detection**: Utilizes Silero VAD to manage audio input intelligently.
|
||||
- **Text-to-Speech**: Integrates ElevenLabs TTS service to convert text responses into audio.
|
||||
- **Language Model Interaction**: Uses OpenAI's GPT-4 model to generate responses based on user input.
|
||||
- **Transcription Services**: Captures and transcribes participant speech for analytics.
|
||||
- **Metrics Collection**: Sends audio data for analysis via Canonical Metrics Service.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.10+
|
||||
- `python-dotenv`
|
||||
- Additional libraries from the `pipecat` package.
|
||||
|
||||
## Setup
|
||||
|
||||
1. Clone the repository.
|
||||
2. Install the required packages.
|
||||
3. Set up environment variables for API keys:
|
||||
- `OPENAI_API_KEY`
|
||||
- `ELEVENLABS_API_KEY`
|
||||
- `CANONICAL_API_KEY`
|
||||
- `CANONICAL_API_URL`
|
||||
4. Run the script.
|
||||
|
||||
## Usage
|
||||
|
||||
The chatbot introduces itself and engages in conversations, providing brief and creative responses. Designed for flexibility, it can support multiple languages with appropriate configuration.
|
||||
|
||||
## Events
|
||||
|
||||
- Participants joining or leaving the call are handled dynamically, adjusting the chatbot's behavior accordingly.
|
||||
|
||||
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
|
||||
|
||||
ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -123,7 +123,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -75,7 +75,7 @@ async def main(room_url: str, token: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -84,7 +84,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -9,11 +9,11 @@ import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import EndFrame, TextFrame
|
||||
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.services.cartesia import CartesiaHttpTTSService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
@@ -36,7 +36,7 @@ async def main():
|
||||
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
|
||||
)
|
||||
|
||||
tts = CartesiaHttpTTSService(
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
@@ -50,12 +50,9 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
participant_name = participant.get("info", {}).get("userName", "")
|
||||
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
|
||||
|
||||
# Register an event handler to exit the application when the user leaves.
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
await task.queue_frames(
|
||||
[TTSSpeakFrame(f"Hello there, {participant_name}!"), EndFrame()]
|
||||
)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
@@ -28,25 +28,24 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
|
||||
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
pipeline = Pipeline([tts, transport.output()])
|
||||
pipeline = Pipeline([tts, transport.output()])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
async def say_something():
|
||||
await asyncio.sleep(1)
|
||||
await task.queue_frame(TextFrame("Hello there!"))
|
||||
async def say_something():
|
||||
await asyncio.sleep(1)
|
||||
await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
runner = PipelineRunner()
|
||||
|
||||
await asyncio.gather(runner.run(task), say_something())
|
||||
await asyncio.gather(runner.run(task), say_something())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
url=url,
|
||||
token=token,
|
||||
room_name=room_name,
|
||||
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
|
||||
params=LiveKitParams(audio_out_enabled=True),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
|
||||
@@ -13,7 +13,7 @@ from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaHttpTTSService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
@@ -37,7 +37,7 @@ async def main():
|
||||
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
|
||||
)
|
||||
|
||||
tts = CartesiaHttpTTSService(
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
@@ -57,11 +57,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await task.queue_frame(LLMMessagesFrame(messages))
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
@@ -5,33 +5,31 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
|
||||
from pipecat.metrics.metrics import (
|
||||
TTFBMetricsData,
|
||||
ProcessingMetricsData,
|
||||
LLMUsageMetricsData,
|
||||
ProcessingMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
@@ -105,11 +103,14 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -127,7 +127,7 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
participant_name = participant.get("info", {}).get("userName", "")
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
@@ -89,7 +89,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -87,7 +87,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
lc.set_participant_id(participant["id"])
|
||||
# Kick off the conversation.
|
||||
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
|
||||
|
||||
@@ -31,11 +31,11 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
None,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
|
||||
@@ -85,7 +85,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -40,7 +40,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
@@ -89,7 +88,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -41,7 +41,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
@@ -90,7 +89,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -86,7 +86,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -5,12 +5,16 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -20,12 +24,6 @@ from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
@@ -85,11 +83,16 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
# Register an event handler to exit the application when the user leaves.
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -77,7 +77,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -96,7 +96,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -32,15 +32,14 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
None,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
@@ -85,7 +84,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -32,11 +32,11 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
None,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -83,7 +83,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
95
examples/foundational/07p-interruptible-krisp.py
Normal file
95
examples/foundational/07p-interruptible-krisp.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
)
|
||||
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.filters.krisp_filter import KrispFilter
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
audio_in_filter=KrispFilter(),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -63,6 +63,7 @@ async def main():
|
||||
"Test",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
@@ -73,7 +74,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_video(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"])
|
||||
|
||||
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ async def main():
|
||||
tk_root.title("Local Mirror")
|
||||
|
||||
daily_transport = DailyTransport(
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True)
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
|
||||
)
|
||||
|
||||
tk_transport = TkLocalTransport(
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
@daily_transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_video(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"])
|
||||
|
||||
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await tts.say("Hi! If you want to talk to me, just say 'Hey Robot'.")
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
@@ -134,7 +134,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await tts.say("Hi, I'm listening!")
|
||||
await transport.send_audio(sounds["ding1.wav"])
|
||||
|
||||
|
||||
@@ -84,8 +84,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -86,8 +86,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -83,8 +83,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -78,16 +78,13 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
params=CartesiaTTSService.InputParams(
|
||||
sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -127,7 +127,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -67,7 +67,8 @@ async def main():
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
model="claude-3-5-sonnet-20240620",
|
||||
# model="claude-3-5-sonnet-20240620",
|
||||
model="claude-3-5-sonnet-latest",
|
||||
enable_prompt_caching_beta=True,
|
||||
)
|
||||
llm.register_function("get_weather", get_weather)
|
||||
@@ -160,8 +161,8 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
transport.capture_participant_transcription(video_participant_id)
|
||||
transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
await transport.capture_participant_transcription(video_participant_id)
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
# await tts.say("Hi! Ask me about the weather in San Francisco.")
|
||||
|
||||
|
||||
@@ -153,8 +153,8 @@ indicate you should use the get_image tool are:
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await tts.say("Hi! Ask me about the weather in San Francisco.")
|
||||
|
||||
|
||||
173
examples/foundational/14e-function-calling-gemini.py
Normal file
173
examples/foundational/14e-function-calling-gemini.py
Normal file
@@ -0,0 +1,173 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.services.openai import OpenAILLMContext
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
video_participant_id = None
|
||||
|
||||
|
||||
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
location = arguments["location"]
|
||||
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
|
||||
|
||||
|
||||
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
|
||||
question = arguments["question"]
|
||||
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
llm.register_function("get_weather", get_weather)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "get_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_image",
|
||||
"description": "Get and image from the camera or video stream.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question to to use when running inference on the acquired image.",
|
||||
},
|
||||
},
|
||||
"required": ["question"],
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
system_prompt = """\
|
||||
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
|
||||
|
||||
Your response will be turned into speech so use only simple words and punctuation.
|
||||
|
||||
You have access to two tools: get_weather and get_image.
|
||||
|
||||
You can respond to questions about the weather using the get_weather tool.
|
||||
|
||||
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
|
||||
indicate you should use the get_image tool are:
|
||||
- What do you see?
|
||||
- What's in the video?
|
||||
- Can you describe the video?
|
||||
- Tell me about what you see.
|
||||
- Tell me something interesting about what you see.
|
||||
- What's happening in the video?
|
||||
"""
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": "Say hello."},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -141,7 +141,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{
|
||||
|
||||
@@ -10,7 +10,7 @@ import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import LLMMessagesFrame, TTSUpdateSettingsFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.function_filter import FunctionFilter
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.whisper import Model, WhisperSTTService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
@@ -61,16 +60,14 @@ async def main():
|
||||
token,
|
||||
"Pipecat",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = WhisperSTTService(model=Model.LARGE)
|
||||
|
||||
english_tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
@@ -116,7 +113,6 @@ async def main():
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
ParallelPipeline( # TTS (bot will speak the chosen language)
|
||||
@@ -132,7 +128,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{
|
||||
|
||||
@@ -92,7 +92,7 @@ async def main():
|
||||
# bot can "hear" and respond to them.
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
|
||||
# When the first participant joins, the bot should introduce itself.
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -99,7 +99,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -166,7 +166,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -249,7 +249,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -98,12 +98,13 @@ async def load_conversation(function_name, tool_call_id, args, llm, context, res
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a succinct, creative and helpful way. Prefer responses that are one sentence long unless you are asked for a longer or more detailed response.",
|
||||
},
|
||||
{"role": "user", "content": ""},
|
||||
{"role": "assistant", "content": []},
|
||||
{"role": "user", "content": "Tell me"},
|
||||
{"role": "user", "content": "a joke"},
|
||||
{"role": "user", "content": "Start the call by saying the word 'hello'. Say only that word."},
|
||||
# {"role": "user", "content": ""},
|
||||
# {"role": "assistant", "content": []},
|
||||
# {"role": "user", "content": "Tell me"},
|
||||
# {"role": "user", "content": "a joke"},
|
||||
]
|
||||
tools = [
|
||||
{
|
||||
@@ -183,7 +184,7 @@ async def main():
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest"
|
||||
)
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
@@ -219,7 +220,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
290
examples/foundational/20d-persistent-context-gemini.py
Normal file
290
examples/foundational/20d-persistent-context-gemini.py
Normal file
@@ -0,0 +1,290 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
video_participant_id = None
|
||||
|
||||
|
||||
BASE_FILENAME = "/tmp/pipecat_conversation_"
|
||||
tts = None
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
temperature = 75 if args["format"] == "fahrenheit" else 24
|
||||
await result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": args["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
question = arguments["question"]
|
||||
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
|
||||
|
||||
|
||||
async def get_saved_conversation_filenames(
|
||||
function_name, tool_call_id, args, llm, context, result_callback
|
||||
):
|
||||
# Construct the full pattern including the BASE_FILENAME
|
||||
full_pattern = f"{BASE_FILENAME}*.json"
|
||||
|
||||
# Use glob to find all matching files
|
||||
matching_files = glob.glob(full_pattern)
|
||||
logger.debug(f"matching files: {matching_files}")
|
||||
|
||||
await result_callback({"filenames": matching_files})
|
||||
|
||||
|
||||
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
|
||||
filename = f"{BASE_FILENAME}{timestamp}.json"
|
||||
logger.debug(
|
||||
f"writing conversation to {filename}\n{json.dumps(context.get_messages_for_logging(), indent=4)}"
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = context.get_messages_for_persistent_storage()
|
||||
# remove the last message (the instruction to save the context)
|
||||
messages.pop()
|
||||
json.dump(messages, file, indent=2)
|
||||
await result_callback({"success": True})
|
||||
except Exception as e:
|
||||
logger.debug(f"error saving conversation: {e}")
|
||||
await result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
global tts
|
||||
filename = args["filename"]
|
||||
logger.debug(f"loading conversation from {filename}")
|
||||
try:
|
||||
with open(filename, "r") as file:
|
||||
context.set_messages(json.load(file))
|
||||
await result_callback(
|
||||
{
|
||||
"success": True,
|
||||
"message": "The most recent conversation has been loaded. Awaiting further instructions.",
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
await result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
# Test message munging ...
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your
|
||||
capabilities in a succinct way. Your output will be converted to audio so don't include special
|
||||
characters in your answers. Respond to what the user said in a creative and helpful way.
|
||||
|
||||
You have several tools you can use to help you.
|
||||
|
||||
You can respond to questions about the weather using the get_weather tool.
|
||||
|
||||
You can save the current conversation using the save_conversation tool. This tool allows you to save
|
||||
the current conversation to external storage. If the user asks you to save the conversation, use this
|
||||
save_conversation too.
|
||||
|
||||
You can load a saved conversation using the load_conversation tool. This tool allows you to load a
|
||||
conversation from external storage. You can get a list of conversations that have been saved using the
|
||||
get_saved_conversation_filenames tool.
|
||||
|
||||
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
|
||||
indicate you should use the get_image tool are:
|
||||
- What do you see?
|
||||
- What's in the video?
|
||||
- Can you describe the video?
|
||||
- Tell me about what you see.
|
||||
- Tell me something interesting about what you see.
|
||||
- What's happening in the video?
|
||||
""",
|
||||
},
|
||||
# {"role": "user", "content": ""},
|
||||
# {"role": "assistant", "content": []},
|
||||
# {"role": "user", "content": "Tell me"},
|
||||
# {"role": "user", "content": "a joke"},
|
||||
]
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "save_conversation",
|
||||
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"user_request_text": {
|
||||
"type": "string",
|
||||
"description": "The text of the user's request to save the conversation.",
|
||||
}
|
||||
},
|
||||
"required": ["user_request_text"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_saved_conversation_filenames",
|
||||
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
|
||||
"parameters": None,
|
||||
},
|
||||
{
|
||||
"name": "load_conversation",
|
||||
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filename": {
|
||||
"type": "string",
|
||||
"description": "The filename of the conversation history to load.",
|
||||
}
|
||||
},
|
||||
"required": ["filename"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_image",
|
||||
"description": "Get and image from the camera or video stream.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question to to use when running inference on the acquired image.",
|
||||
},
|
||||
},
|
||||
"required": ["question"],
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
async def main():
|
||||
global tts
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("save_conversation", save_conversation)
|
||||
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
|
||||
llm.register_function("load_conversation", load_conversation)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
tts,
|
||||
context_aggregator.assistant(),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
# report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
133
examples/foundational/21-tavus-layer.py
Normal file
133
examples/foundational/21-tavus-layer.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from typing import Any, Mapping
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.tavus import TavusVideoService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tavus = TavusVideoService(
|
||||
api_key=os.getenv("TAVUS_API_KEY"),
|
||||
replica_id=os.getenv("TAVUS_REPLICA_ID"),
|
||||
persona_id=os.getenv("TAVUS_PERSONA_ID", "pipecat0"),
|
||||
session=session,
|
||||
)
|
||||
|
||||
# get persona, look up persona_name, set this as the bot name to ignore
|
||||
persona_name = await tavus.get_persona_name()
|
||||
room_url = await tavus.initialize()
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url=room_url,
|
||||
token=None,
|
||||
bot_name="Pipecat bot",
|
||||
params=DailyParams(
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(model="gpt-4o-mini")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tavus, # Tavus output layer
|
||||
transport.output(), # Transport bot output
|
||||
tma_out, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(
|
||||
transport: DailyTransport, participant: Mapping[str, Any]
|
||||
) -> None:
|
||||
# Ignore the Tavus replica's microphone
|
||||
if participant.get("info", {}).get("userName", "") == persona_name:
|
||||
logger.debug(f"Ignoring {participant['id']}'s microphone")
|
||||
await transport.update_subscriptions(
|
||||
participant_settings={
|
||||
participant["id"]: {
|
||||
"media": {"microphone": "unsubscribed"},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if participant.get("info", {}).get("userName", "") != persona_name:
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
168
examples/foundational/22-natural-conversation.py
Normal file
168
examples/foundational/22-natural-conversation.py
Normal file
@@ -0,0 +1,168 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.null_filter import NullFilter
|
||||
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.sync.event_notifier import EventNotifier
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
# This is the LLM that will be used to detect if the user has finished a
|
||||
# statement. This doesn't really need to be an LLM, we could use NLP
|
||||
# libraries for that, but it was easier as an example because we
|
||||
# leverage the context aggregators.
|
||||
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
statement_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Determine if the user's statement is a complete sentence or question, ending in a natural pause or punctuation. Return 'YES' if it is complete and 'NO' if it seems to leave a thought unfinished.",
|
||||
},
|
||||
]
|
||||
|
||||
statement_context = OpenAILLMContext(statement_messages)
|
||||
statement_context_aggregator = statement_llm.create_context_aggregator(statement_context)
|
||||
|
||||
# This is the regular LLM.
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'YES' if it thinks the user
|
||||
# completed a sentence. So, if it's 'YES' we will return true in this
|
||||
# predicate which will wake up the notifier.
|
||||
async def wake_check_filter(frame):
|
||||
return frame.text == "YES"
|
||||
|
||||
# This is a notifier that we use to synchronize the two LLMs.
|
||||
notifier = EventNotifier()
|
||||
|
||||
# This a filter that will wake up the notifier if the given predicate
|
||||
# (wake_check_filter) returns true.
|
||||
completness_check = WakeNotifierFilter(
|
||||
notifier, types=(TextFrame,), filter=wake_check_filter
|
||||
)
|
||||
|
||||
# This processor keeps the last context and will let it through once the
|
||||
# notifier is woken up.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
|
||||
|
||||
# Notify if the user hasn't said anything.
|
||||
async def user_idle_notifier(frame):
|
||||
await notifier.notify()
|
||||
|
||||
# Sometimes the LLM will fail detecting if a user has completed a
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
|
||||
|
||||
# The ParallePipeline input are the user transcripts. We have two
|
||||
# contexts. The first one will be used to determine if the user finished
|
||||
# a statement and if so the notifier will be woken up. The second
|
||||
# context is simply the regular context but it's gated waiting for the
|
||||
# notifier to be woken up.
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
ParallelPipeline(
|
||||
[
|
||||
statement_context_aggregator.user(),
|
||||
statement_llm,
|
||||
completness_check,
|
||||
NullFilter(),
|
||||
],
|
||||
[context_aggregator.user(), gated_context_aggregator, llm],
|
||||
),
|
||||
user_idle,
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
121
examples/foundational/23-bot-background-sound.py
Normal file
121
examples/foundational/23-bot-background-sound.py
Normal file
@@ -0,0 +1,121 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame, MixerUpdateSettingsFrame, MixerEnableFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure_with_args
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
parser = argparse.ArgumentParser(description="Bot Background Sound")
|
||||
parser.add_argument("-i", "--input", type=str, required=True, help="Input audio file")
|
||||
|
||||
(room_url, token, args) = await configure_with_args(session, parser)
|
||||
|
||||
soundfile_mixer = SoundfileMixer(
|
||||
sound_files={"office": args.input},
|
||||
default_sound="office",
|
||||
volume=2.0,
|
||||
)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_mixer=soundfile_mixer,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Show how to use mixer control frames.
|
||||
await asyncio.sleep(10.0)
|
||||
await task.queue_frame(MixerUpdateSettingsFrame({"volume": 0.5}))
|
||||
await asyncio.sleep(5.0)
|
||||
await task.queue_frame(MixerEnableFrame(False))
|
||||
await asyncio.sleep(5.0)
|
||||
await task.queue_frame(MixerEnableFrame(True))
|
||||
await asyncio.sleep(5.0)
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -203,8 +203,8 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
ir.set_participant_id(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ class IntakeProcessor:
|
||||
}
|
||||
)
|
||||
print(f"!!! about to await llm process frame in start prescrpitions")
|
||||
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
print(f"!!! past await process frame in start prescriptions")
|
||||
|
||||
async def start_allergies(self, function_name, llm, context):
|
||||
@@ -222,7 +222,7 @@ class IntakeProcessor:
|
||||
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
|
||||
}
|
||||
)
|
||||
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def start_conditions(self, function_name, llm, context):
|
||||
print("!!! doing start conditions")
|
||||
@@ -261,7 +261,7 @@ class IntakeProcessor:
|
||||
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
|
||||
}
|
||||
)
|
||||
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def start_visit_reasons(self, function_name, llm, context):
|
||||
print("!!! doing start visit reasons")
|
||||
@@ -270,7 +270,7 @@ class IntakeProcessor:
|
||||
context.add_message(
|
||||
{"role": "system", "content": "Now, thank the user and end the conversation."}
|
||||
)
|
||||
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
|
||||
logger.info(f"!!! Saving data: {args}")
|
||||
@@ -352,7 +352,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
print(f"Context is: {context}")
|
||||
await task.queue_frames([OpenAILLMContextFrame(context)])
|
||||
|
||||
|
||||
@@ -17,6 +17,10 @@ from fastapi.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
|
||||
@@ -102,7 +102,7 @@ async def main(room_url, token=None):
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
logger.debug("Participant joined, storytime commence!")
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await intro_task.queue_frames(
|
||||
[
|
||||
images["book1"],
|
||||
|
||||
@@ -165,7 +165,7 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
|
||||
@@ -121,7 +121,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
@@ -35,6 +35,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
async def main():
|
||||
transport = WebsocketServerTransport(
|
||||
params=WebsocketServerParams(
|
||||
audio_out_sample_rate=16000,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=True,
|
||||
vad_enabled=True,
|
||||
@@ -50,6 +51,7 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
sample_rate=16000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -74,7 +76,7 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
|
||||
@@ -21,14 +21,14 @@ classifiers = [
|
||||
]
|
||||
dependencies = [
|
||||
"aiohttp~=3.10.3",
|
||||
"loguru~=0.7.2",
|
||||
"Markdown~=3.7",
|
||||
"numpy~=1.26.4",
|
||||
"loguru~=0.7.2",
|
||||
"Pillow~=10.4.0",
|
||||
"protobuf~=4.25.4",
|
||||
"pydantic~=2.8.2",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"scipy~=1.14.1",
|
||||
"resampy~=0.4.3",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -42,24 +42,27 @@ aws = [ "boto3~=1.35.27" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
|
||||
canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.0.13", "websockets~=13.1" ]
|
||||
daily = [ "daily-python~=0.11.0" ]
|
||||
daily = [ "daily-python~=0.12.0" ]
|
||||
deepgram = [ "deepgram-sdk~=3.7.3" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=13.1" ]
|
||||
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
|
||||
google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ]
|
||||
gstreamer = [ "pygobject~=3.48.2" ]
|
||||
fireworks = [ "openai~=1.37.2" ]
|
||||
krisp = [ "pipecat-ai-krisp~=0.2.0" ]
|
||||
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
|
||||
livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ]
|
||||
lmnt = [ "lmnt~=1.1.4" ]
|
||||
local = [ "pyaudio~=0.2.14" ]
|
||||
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "openai~=1.50.2", "websockets~=13.1", "python-deepcompare~=1.0.1" ]
|
||||
openpipe = [ "openpipe~=4.24.0" ]
|
||||
playht = [ "pyht~=0.1.4", "websockets~=13.1" ]
|
||||
silero = [ "onnxruntime~=1.19.2" ]
|
||||
soundfile = [ "soundfile~=0.12.1" ]
|
||||
together = [ "openai~=1.50.2" ]
|
||||
websocket = [ "websockets~=13.1", "fastapi~=0.115.0" ]
|
||||
whisper = [ "faster-whisper~=1.0.3" ]
|
||||
@@ -74,3 +77,7 @@ pythonpath = ["src"]
|
||||
[tool.setuptools_scm]
|
||||
local_scheme = "no-local-version"
|
||||
fallback_version = "0.0.0-dev"
|
||||
|
||||
[tool.ruff]
|
||||
exclude = ["*_pb2.py"]
|
||||
line-length = 100
|
||||
|
||||
0
src/pipecat/audio/filters/__init__.py
Normal file
0
src/pipecat/audio/filters/__init__.py
Normal file
47
src/pipecat/audio/filters/base_audio_filter.py
Normal file
47
src/pipecat/audio/filters/base_audio_filter.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pipecat.frames.frames import FilterControlFrame
|
||||
|
||||
|
||||
class BaseAudioFilter(ABC):
|
||||
"""This is a base class for input transport audio filters. If an audio
|
||||
filter is provided to the input transport it will be used to process audio
|
||||
before VAD and before pushing it downstream. There are control frames to
|
||||
update filter settings or to enable or disable the filter at runtime.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def start(self, sample_rate: int):
|
||||
"""This will be called from the input transport when the transport is
|
||||
started. It can be used to initialize the filter. The input transport
|
||||
sample rate is provided so the filter can adjust to that sample rate.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def stop(self):
|
||||
"""This will be called from the input transport when the transport is
|
||||
stopping.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: FilterControlFrame):
|
||||
"""This will be called when the input transport receives a
|
||||
FilterControlFrame.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def filter(self, audio: bytes) -> bytes:
|
||||
pass
|
||||
78
src/pipecat/audio/filters/krisp_filter.py
Normal file
78
src/pipecat/audio/filters/krisp_filter.py
Normal file
@@ -0,0 +1,78 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
|
||||
from loguru import logger
|
||||
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
|
||||
|
||||
try:
|
||||
from pipecat_ai_krisp.audio.krisp_processor import KrispAudioProcessor
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use the Krisp filter, you need to `pip install pipecat-ai[krisp]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class KrispFilter(BaseAudioFilter):
|
||||
def __init__(
|
||||
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
|
||||
) -> None:
|
||||
"""
|
||||
Initializes the KrispAudioProcessor with customizable audio processing settings.
|
||||
|
||||
:param sample_type: The type of audio sample, default is 'PCM_16'.
|
||||
:param channels: Number of audio channels, default is 1.
|
||||
:param model_path: Path to the Krisp model; defaults to environment variable KRISP_MODEL_PATH if not provided.
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
# Set model path, checking environment if not specified
|
||||
self._model_path = model_path or os.getenv("KRISP_MODEL_PATH")
|
||||
if not self._model_path:
|
||||
logger.error(
|
||||
"Model path for KrispAudioProcessor is not provided and KRISP_MODEL_PATH is not set."
|
||||
)
|
||||
raise ValueError("Model path for KrispAudioProcessor must be provided.")
|
||||
|
||||
self._sample_type = sample_type
|
||||
self._channels = channels
|
||||
self._sample_rate = 0
|
||||
self._filtering = True
|
||||
self._krisp_processor = None
|
||||
|
||||
async def start(self, sample_rate: int):
|
||||
self._sample_rate = sample_rate
|
||||
self._krisp_processor = KrispAudioProcessor(
|
||||
self._sample_rate, self._sample_type, self._channels, self._model_path
|
||||
)
|
||||
|
||||
async def stop(self):
|
||||
self._krisp_processor = None
|
||||
|
||||
async def process_frame(self, frame: FilterControlFrame):
|
||||
if isinstance(frame, FilterEnableFrame):
|
||||
self._filtering = frame.enable
|
||||
|
||||
async def filter(self, audio: bytes) -> bytes:
|
||||
if not self._filtering:
|
||||
return audio
|
||||
|
||||
data = np.frombuffer(audio, dtype=np.int16)
|
||||
|
||||
# Add a small epsilon to avoid division by zero.
|
||||
epsilon = 1e-10
|
||||
data = data.astype(np.float32) + epsilon
|
||||
|
||||
# Process the audio chunk to reduce noise
|
||||
reduced_noise = self._krisp_processor.process(data)
|
||||
|
||||
# Clip and set processed audio back to frame
|
||||
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
|
||||
|
||||
return audio
|
||||
54
src/pipecat/audio/filters/noisereduce_filter.py
Normal file
54
src/pipecat/audio/filters/noisereduce_filter.py
Normal file
@@ -0,0 +1,54 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
|
||||
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
|
||||
|
||||
try:
|
||||
import noisereduce as nr
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use the noisereduce filter, you need to `pip install pipecat-ai[noisereduce]`."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class NoisereduceFilter(BaseAudioFilter):
|
||||
def __init__(self) -> None:
|
||||
self._filtering = True
|
||||
self._sample_rate = 0
|
||||
|
||||
async def start(self, sample_rate: int):
|
||||
self._sample_rate = sample_rate
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: FilterControlFrame):
|
||||
if isinstance(frame, FilterEnableFrame):
|
||||
self._filtering = frame.enable
|
||||
|
||||
async def filter(self, audio: bytes) -> bytes:
|
||||
if not self._filtering:
|
||||
return audio
|
||||
|
||||
data = np.frombuffer(audio, dtype=np.int16)
|
||||
|
||||
# Add a small epsilon to avoid division by zero.
|
||||
epsilon = 1e-10
|
||||
data = data.astype(np.float32) + epsilon
|
||||
|
||||
# Noise reduction
|
||||
reduced_noise = nr.reduce_noise(y=data, sr=self._sample_rate)
|
||||
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
|
||||
|
||||
return audio
|
||||
0
src/pipecat/audio/mixers/__init__.py
Normal file
0
src/pipecat/audio/mixers/__init__.py
Normal file
53
src/pipecat/audio/mixers/base_audio_mixer.py
Normal file
53
src/pipecat/audio/mixers/base_audio_mixer.py
Normal file
@@ -0,0 +1,53 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pipecat.frames.frames import MixerControlFrame
|
||||
|
||||
|
||||
class BaseAudioMixer(ABC):
|
||||
"""This is a base class for output transport audio mixers. If an audio mixer
|
||||
is provided to the output transport it will be used to mix the audio frames
|
||||
coming into to the transport with the audio generated from the mixer. There
|
||||
are control frames to update mixer settings or to enable or disable the
|
||||
mixer at runtime.
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def start(self, sample_rate: int):
|
||||
"""This will be called from the output transport when the transport is
|
||||
started. It can be used to initialize the mixer. The output transport
|
||||
sample rate is provided so the mixer can adjust to that sample rate.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def stop(self):
|
||||
"""This will be called from the output transport when the transport is
|
||||
stopping.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def process_frame(self, frame: MixerControlFrame):
|
||||
"""This will be called when the output transport receives a
|
||||
MixerControlFrame.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def mix(self, audio: bytes) -> bytes:
|
||||
"""This is called with the audio that is about to be sent from the
|
||||
output transport and that should be mixed with the mixer audio if the
|
||||
mixer is enabled.
|
||||
|
||||
"""
|
||||
pass
|
||||
147
src/pipecat/audio/mixers/soundfile_mixer.py
Normal file
147
src/pipecat/audio/mixers/soundfile_mixer.py
Normal file
@@ -0,0 +1,147 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Dict, Mapping
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.frames.frames import MixerControlFrame, MixerEnableFrame, MixerUpdateSettingsFrame
|
||||
|
||||
try:
|
||||
import soundfile as sf
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use the soundfile mixer, you need to `pip install pipecat-ai[soundfile]`."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class SoundfileMixer(BaseAudioMixer):
|
||||
"""This is an audio mixer that mixes incoming audio with audio from a
|
||||
file. It uses the soundfile library to load files so it supports multiple
|
||||
formats. The audio files need to only have one channel (mono) but they can
|
||||
have any sample rate that will be resampled to the output transport sample
|
||||
rate.
|
||||
|
||||
Multiple files can be loaded, each with a different name. The
|
||||
`MixerUpdateSettingsFrame` has the following settings available: `sound`
|
||||
(str) and `volume` (float) to be able to update to a different sound file or
|
||||
to change the volume at runtime.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sound_files: Mapping[str, str],
|
||||
default_sound: str,
|
||||
volume: float = 0.4,
|
||||
loop: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._sound_files = sound_files
|
||||
self._volume = volume
|
||||
self._sample_rate = 0
|
||||
|
||||
self._sound_pos = 0
|
||||
self._sounds: Dict[str, Any] = {}
|
||||
self._current_sound = default_sound
|
||||
self._mixing = True
|
||||
self._loop = loop
|
||||
|
||||
async def start(self, sample_rate: int):
|
||||
self._sample_rate = sample_rate
|
||||
for sound_name, file_name in self._sound_files.items():
|
||||
await asyncio.to_thread(self._load_sound_file, sound_name, file_name)
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: MixerControlFrame):
|
||||
if isinstance(frame, MixerUpdateSettingsFrame):
|
||||
await self._update_settings(frame)
|
||||
elif isinstance(frame, MixerEnableFrame):
|
||||
await self._enable_mixing(frame.enable)
|
||||
pass
|
||||
|
||||
async def mix(self, audio: bytes) -> bytes:
|
||||
return self._mix_with_sound(audio)
|
||||
|
||||
async def _enable_mixing(self, enable: bool):
|
||||
self._mixing = enable
|
||||
|
||||
async def _update_settings(self, frame: MixerUpdateSettingsFrame):
|
||||
for setting, value in frame.settings.items():
|
||||
match setting:
|
||||
case "sound":
|
||||
await self._change_sound(value)
|
||||
case "volume":
|
||||
await self._update_volume(value)
|
||||
case "loop":
|
||||
await self._update_loop(value)
|
||||
|
||||
async def _change_sound(self, sound: str):
|
||||
if sound in self._sound_files:
|
||||
self._current_sound = sound
|
||||
self._sound_pos = 0
|
||||
else:
|
||||
logger.error(f"Sound {sound} is not available")
|
||||
|
||||
async def _update_volume(self, volume: float):
|
||||
self._volume = volume
|
||||
|
||||
async def _update_loop(self, loop: bool):
|
||||
self._loop = loop
|
||||
|
||||
def _load_sound_file(self, sound_name: str, file_name: str):
|
||||
try:
|
||||
logger.debug(f"Loading background sound from {file_name}")
|
||||
sound, sample_rate = sf.read(file_name, dtype="int16")
|
||||
|
||||
audio = sound.tobytes()
|
||||
if sample_rate != self._sample_rate:
|
||||
logger.debug(f"Resampling background sound to {self._sample_rate}")
|
||||
audio = resample_audio(audio, sample_rate, self._sample_rate)
|
||||
|
||||
# Convert from np to bytes again.
|
||||
self._sounds[sound_name] = np.frombuffer(audio, dtype=np.int16)
|
||||
except Exception as e:
|
||||
logger.error(f"Unable to open file {file_name}: {e}")
|
||||
|
||||
def _mix_with_sound(self, audio: bytes):
|
||||
"""Mixes raw audio frames with chunks of the same length from the sound
|
||||
file.
|
||||
|
||||
"""
|
||||
if not self._mixing:
|
||||
return audio
|
||||
|
||||
audio_np = np.frombuffer(audio, dtype=np.int16)
|
||||
chunk_size = len(audio_np)
|
||||
|
||||
# Sound currently playing.
|
||||
sound = self._sounds[self._current_sound]
|
||||
|
||||
# Go back to the beginning if we don't have enough data.
|
||||
if self._sound_pos + chunk_size > len(sound):
|
||||
if not self._loop:
|
||||
return audio
|
||||
self._sound_pos = 0
|
||||
|
||||
start_pos = self._sound_pos
|
||||
end_pos = self._sound_pos + chunk_size
|
||||
self._sound_pos = end_pos
|
||||
|
||||
sound_np = sound[start_pos:end_pos]
|
||||
|
||||
mixed_audio = np.clip(audio_np + sound_np * self._volume, -32768, 32767).astype(np.int16)
|
||||
|
||||
return mixed_audio.astype(np.int16).tobytes()
|
||||
@@ -7,13 +7,14 @@
|
||||
import audioop
|
||||
import numpy as np
|
||||
import pyloudnorm as pyln
|
||||
from scipy import signal
|
||||
import resampy
|
||||
|
||||
|
||||
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
|
||||
if original_rate == target_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
num_samples = int(len(audio) * target_rate / original_rate)
|
||||
resampled_audio = signal.resample(audio_data, num_samples)
|
||||
resampled_audio = resampy.resample(audio_data, original_rate, target_rate)
|
||||
return resampled_audio.astype(np.int16).tobytes()
|
||||
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ class SileroOnnxModel:
|
||||
|
||||
if sr not in self.sample_rates:
|
||||
raise ValueError(
|
||||
f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)"
|
||||
f"Supported sampling rates: {self.sample_rates} (or multiple of 16000)"
|
||||
)
|
||||
if sr / np.shape(x)[1] > 31.25:
|
||||
raise ValueError("Input audio chunk is too short")
|
||||
|
||||
@@ -12,6 +12,11 @@ from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
|
||||
|
||||
VAD_CONFIDENCE = 0.7
|
||||
VAD_START_SECS = 0.2
|
||||
VAD_STOP_SECS = 0.8
|
||||
VAD_MIN_VOLUME = 0.6
|
||||
|
||||
|
||||
class VADState(Enum):
|
||||
QUIET = 1
|
||||
@@ -21,10 +26,10 @@ class VADState(Enum):
|
||||
|
||||
|
||||
class VADParams(BaseModel):
|
||||
confidence: float = 0.7
|
||||
start_secs: float = 0.2
|
||||
stop_secs: float = 0.8
|
||||
min_volume: float = 0.6
|
||||
confidence: float = VAD_CONFIDENCE
|
||||
start_secs: float = VAD_START_SECS
|
||||
stop_secs: float = VAD_STOP_SECS
|
||||
min_volume: float = VAD_MIN_VOLUME
|
||||
|
||||
|
||||
class VADAnalyzer:
|
||||
@@ -41,13 +46,17 @@ class VADAnalyzer:
|
||||
self._prev_volume = 0
|
||||
|
||||
@property
|
||||
def sample_rate(self):
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def num_channels(self):
|
||||
def num_channels(self) -> int:
|
||||
return self._num_channels
|
||||
|
||||
@property
|
||||
def params(self) -> VADParams:
|
||||
return self._params
|
||||
|
||||
@abstractmethod
|
||||
def num_frames_required(self) -> int:
|
||||
pass
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, List, Mapping, Optional, Tuple
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
@@ -557,7 +557,7 @@ class TTSStoppedFrame(ControlFrame):
|
||||
class ServiceUpdateSettingsFrame(ControlFrame):
|
||||
"""A control frame containing a request to update service settings."""
|
||||
|
||||
settings: Dict[str, Any]
|
||||
settings: Mapping[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -582,3 +582,45 @@ class VADParamsUpdateFrame(ControlFrame):
|
||||
"""
|
||||
|
||||
params: VADParams
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterControlFrame(ControlFrame):
|
||||
"""Base control frame for other audio filter frames."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterUpdateSettingsFrame(FilterControlFrame):
|
||||
"""Control frame to update filter settings."""
|
||||
|
||||
settings: Mapping[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterEnableFrame(FilterControlFrame):
|
||||
"""Control frame to enable or disable the filter at runtime."""
|
||||
|
||||
enable: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixerControlFrame(ControlFrame):
|
||||
"""Base control frame for other audio mixer frames."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixerUpdateSettingsFrame(MixerControlFrame):
|
||||
"""Control frame to update mixer settings."""
|
||||
|
||||
settings: Mapping[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixerEnableFrame(MixerControlFrame):
|
||||
"""Control frame to enable or disable the mixer at runtime."""
|
||||
|
||||
enable: bool
|
||||
|
||||
@@ -110,13 +110,13 @@ class ParallelPipeline(BasePipeline):
|
||||
|
||||
if direction == FrameDirection.UPSTREAM:
|
||||
# If we get an upstream frame we process it in each sink.
|
||||
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
|
||||
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
|
||||
elif direction == FrameDirection.DOWNSTREAM:
|
||||
# If we get a downstream frame we process it in each source.
|
||||
# TODO(aleix): We are creating task for each frame. For real-time
|
||||
# video/audio this might be too slow. We should use an already
|
||||
# created task instead.
|
||||
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
|
||||
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])
|
||||
|
||||
# If we get an EndFrame we stop our queue processing tasks and wait on
|
||||
# all the pipelines to finish.
|
||||
|
||||
@@ -77,9 +77,9 @@ class Pipeline(BasePipeline):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if direction == FrameDirection.DOWNSTREAM:
|
||||
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
elif direction == FrameDirection.UPSTREAM:
|
||||
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
|
||||
await self._sink.queue_frame(frame, FrameDirection.UPSTREAM)
|
||||
|
||||
async def _cleanup_processors(self):
|
||||
for p in self._processors:
|
||||
|
||||
@@ -160,19 +160,17 @@ class PipelineTask:
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
clock=self._clock,
|
||||
)
|
||||
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
|
||||
await self._source.process_frame(
|
||||
self._initial_metrics_frame(), FrameDirection.DOWNSTREAM
|
||||
)
|
||||
await self._source.queue_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
|
||||
|
||||
running = True
|
||||
should_cleanup = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._push_queue.get()
|
||||
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._wait_for_endframe()
|
||||
running = not isinstance(frame, (StopTaskFrame, EndFrame))
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
|
||||
|
||||
class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
"""This aggregator keeps the last received OpenAI LLM context frame and it
|
||||
doesn't let it through until the notifier is notified.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._last_context_frame = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.push_frame(frame)
|
||||
await self._start()
|
||||
if isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._stop()
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
self._last_context_frame = frame
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _start(self):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await self._notifier.wait()
|
||||
if self._last_context_frame:
|
||||
await self.push_frame(self._last_context_frame)
|
||||
self._last_context_frame = None
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
@@ -70,6 +70,8 @@ class OpenAILLMContext:
|
||||
context.add_message(message)
|
||||
return context
|
||||
|
||||
# todo: deprecate from_image_frame. It's only used to create a single-use
|
||||
# context, which isn't useful for most real-world applications.
|
||||
@staticmethod
|
||||
def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext":
|
||||
"""
|
||||
@@ -77,6 +79,10 @@ class OpenAILLMContext:
|
||||
expects images to be base64 encoded, but other vision models may not.
|
||||
So we'll store the image as bytes and do the base64 encoding as needed
|
||||
in the LLM service.
|
||||
|
||||
NOTE: the above only applies to the deprecated use of this method. The
|
||||
add_image_frame_message() below does the base64 encoding as expected
|
||||
in the OpenAI format.
|
||||
"""
|
||||
context = OpenAILLMContext()
|
||||
buffer = io.BytesIO()
|
||||
|
||||
@@ -4,14 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import List
|
||||
from typing import Tuple, Type
|
||||
|
||||
from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class FrameFilter(FrameProcessor):
|
||||
def __init__(self, types: List[type]):
|
||||
def __init__(self, types: Tuple[Type[Frame]]):
|
||||
super().__init__()
|
||||
self._types = types
|
||||
|
||||
@@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor):
|
||||
#
|
||||
|
||||
def _should_passthrough_frame(self, frame):
|
||||
for t in self._types:
|
||||
if isinstance(frame, t):
|
||||
return True
|
||||
if isinstance(frame, self._types):
|
||||
return True
|
||||
|
||||
return (
|
||||
isinstance(frame, AppFrame)
|
||||
|
||||
14
src/pipecat/processors/filters/null_filter.py
Normal file
14
src/pipecat/processors/filters/null_filter.py
Normal file
@@ -0,0 +1,14 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
|
||||
|
||||
class NullFilter(FrameProcessor):
|
||||
"""This filter doesn't allow passing any frames up or downstream."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
40
src/pipecat/processors/filters/wake_notifier_filter.py
Normal file
40
src/pipecat/processors/filters/wake_notifier_filter.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Awaitable, Callable, Tuple, Type
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
|
||||
|
||||
class WakeNotifierFilter(FrameProcessor):
|
||||
"""This processor expects a list of frame types and will execute a given
|
||||
callback predicate when a frame of any of those type is being processed. If
|
||||
the callback returns true the notifier will be notified.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
notifier: BaseNotifier,
|
||||
*,
|
||||
types: Tuple[Type[Frame]],
|
||||
filter: Callable[[Frame], Awaitable[bool]],
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._types = types
|
||||
self._filter = filter
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, self._types) and await self._filter(frame):
|
||||
await self._notifier.notify()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -8,6 +8,7 @@ import asyncio
|
||||
import inspect
|
||||
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Optional
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.frames.frames import (
|
||||
@@ -62,6 +63,13 @@ class FrameProcessor:
|
||||
self._metrics = metrics or FrameProcessorMetrics()
|
||||
self._metrics.set_processor_name(self.name)
|
||||
|
||||
# Processors have an input queue. The input queue will be processed
|
||||
# immediately (default) or it will block if `pause_processing_frames()`
|
||||
# is called. To resume processing frames we need to call
|
||||
# `resume_processing_frames()`.
|
||||
self.__should_block_frames = False
|
||||
self.__create_input_task()
|
||||
|
||||
# Every processor in Pipecat should only output frames from a single
|
||||
# task. This avoid problems like audio overlapping. System frames are
|
||||
# the exception to this rule. This create this task.
|
||||
@@ -126,7 +134,8 @@ class FrameProcessor:
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
async def cleanup(self):
|
||||
pass
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
self._next = processor
|
||||
@@ -145,6 +154,28 @@ class FrameProcessor:
|
||||
def get_clock(self) -> BaseClock:
|
||||
return self._clock
|
||||
|
||||
async def queue_frame(
|
||||
self,
|
||||
frame: Frame,
|
||||
direction: FrameDirection = FrameDirection.DOWNSTREAM,
|
||||
callback: Optional[
|
||||
Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
|
||||
] = None,
|
||||
):
|
||||
if isinstance(frame, SystemFrame):
|
||||
# We don't want to queue system frames.
|
||||
await self.process_frame(frame, direction)
|
||||
else:
|
||||
# We queue everything else.
|
||||
await self.__input_queue.put((frame, direction, callback))
|
||||
|
||||
async def pause_processing_frames(self):
|
||||
self.__should_block_frames = True
|
||||
|
||||
async def resume_processing_frames(self):
|
||||
self.__input_event.set()
|
||||
self.__should_block_frames = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._clock = frame.clock
|
||||
@@ -189,11 +220,16 @@ class FrameProcessor:
|
||||
#
|
||||
|
||||
async def _start_interruption(self):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self.__push_frame_task.cancel()
|
||||
await self.__push_frame_task
|
||||
# Cancel the push frame task. This will stop pushing frames downstream.
|
||||
await self.__cancel_push_task()
|
||||
|
||||
# Create a new queue and task.
|
||||
# Cancel the input task. This will stop processing queued frames.
|
||||
await self.__cancel_input_task()
|
||||
|
||||
# Create a new input queue and task.
|
||||
self.__create_input_task()
|
||||
|
||||
# Create a new output queue and task.
|
||||
self.__create_push_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
@@ -204,17 +240,55 @@ class FrameProcessor:
|
||||
try:
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
await self._next.process_frame(frame, direction)
|
||||
await self._next.queue_frame(frame, direction)
|
||||
elif direction == FrameDirection.UPSTREAM and self._prev:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
await self._prev.process_frame(frame, direction)
|
||||
await self._prev.queue_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
|
||||
def __create_input_task(self):
|
||||
self.__input_queue = asyncio.Queue()
|
||||
self.__input_frame_task = self.get_event_loop().create_task(
|
||||
self.__input_frame_task_handler()
|
||||
)
|
||||
self.__input_event = asyncio.Event()
|
||||
|
||||
async def __cancel_input_task(self):
|
||||
self.__input_frame_task.cancel()
|
||||
await self.__input_frame_task
|
||||
|
||||
async def __input_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
if self.__should_block_frames:
|
||||
await self.__input_event.wait()
|
||||
self.__input_event.clear()
|
||||
|
||||
(frame, direction, callback) = await self.__input_queue.get()
|
||||
|
||||
# Process the frame.
|
||||
await self.process_frame(frame, direction)
|
||||
|
||||
# If this frame has an associated callback, call it now.
|
||||
if callback:
|
||||
await callback(self, frame, direction)
|
||||
|
||||
running = not isinstance(frame, EndFrame)
|
||||
|
||||
self.__input_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
def __create_push_task(self):
|
||||
self.__push_queue = asyncio.Queue()
|
||||
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())
|
||||
|
||||
async def __cancel_push_task(self):
|
||||
self.__push_frame_task.cancel()
|
||||
await self.__push_frame_task
|
||||
|
||||
async def __push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
|
||||
@@ -366,10 +366,6 @@ class RTVIMetricsMessage(BaseModel):
|
||||
data: Mapping[str, Any]
|
||||
|
||||
|
||||
class RTVIProcessorParams(BaseModel):
|
||||
send_bot_ready: bool = True
|
||||
|
||||
|
||||
class RTVIFrameProcessor(FrameProcessor):
|
||||
def __init__(self, direction: FrameDirection = FrameDirection.DOWNSTREAM, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
@@ -573,16 +569,14 @@ class RTVIProcessor(FrameProcessor):
|
||||
self,
|
||||
*,
|
||||
config: RTVIConfig = RTVIConfig(config=[]),
|
||||
params: RTVIProcessorParams = RTVIProcessorParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._config = config
|
||||
self._params = params
|
||||
|
||||
self._pipeline: FrameProcessor | None = None
|
||||
self._pipeline_started = False
|
||||
|
||||
self._bot_ready = False
|
||||
self._client_ready = False
|
||||
self._client_ready_id = ""
|
||||
|
||||
@@ -590,14 +584,14 @@ class RTVIProcessor(FrameProcessor):
|
||||
self._registered_services: Dict[str, RTVIService] = {}
|
||||
|
||||
# A task to process incoming action frames.
|
||||
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
|
||||
self._action_queue = asyncio.Queue()
|
||||
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
|
||||
|
||||
# A task to process incoming transport messages.
|
||||
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
|
||||
self._message_queue = asyncio.Queue()
|
||||
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
|
||||
|
||||
self._register_event_handler("on_bot_ready")
|
||||
self._register_event_handler("on_client_ready")
|
||||
|
||||
def register_action(self, action: RTVIAction):
|
||||
id = self._action_id(action.service, action.action)
|
||||
@@ -606,6 +600,15 @@ class RTVIProcessor(FrameProcessor):
|
||||
def register_service(self, service: RTVIService):
|
||||
self._registered_services[service.name] = service
|
||||
|
||||
async def set_client_ready(self):
|
||||
self._client_ready = True
|
||||
await self._call_event_handler("on_client_ready")
|
||||
|
||||
async def set_bot_ready(self):
|
||||
self._bot_ready = True
|
||||
await self._update_config(self._config, False)
|
||||
await self._send_bot_ready()
|
||||
|
||||
async def interrupt_bot(self):
|
||||
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
@@ -613,11 +616,6 @@ class RTVIProcessor(FrameProcessor):
|
||||
message = RTVIError(data=RTVIErrorData(error=error, fatal=False))
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def set_client_ready(self):
|
||||
if not self._client_ready:
|
||||
self._client_ready = True
|
||||
await self._maybe_send_bot_ready()
|
||||
|
||||
async def handle_message(self, message: RTVIMessage):
|
||||
await self._message_queue.put(message)
|
||||
|
||||
@@ -681,21 +679,15 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self._pipeline.cleanup()
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
self._pipeline_started = True
|
||||
await self._maybe_send_bot_ready()
|
||||
pass
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
if self._action_task:
|
||||
self._action_task.cancel()
|
||||
await self._action_task
|
||||
self._action_task = None
|
||||
|
||||
if self._message_task:
|
||||
self._message_task.cancel()
|
||||
await self._message_task
|
||||
self._message_task = None
|
||||
await self._cancel_tasks()
|
||||
|
||||
async def _cancel(self, frame: CancelFrame):
|
||||
await self._cancel_tasks()
|
||||
|
||||
async def _cancel_tasks(self):
|
||||
if self._action_task:
|
||||
self._action_task.cancel()
|
||||
await self._action_task
|
||||
@@ -769,9 +761,8 @@ class RTVIProcessor(FrameProcessor):
|
||||
logger.warning(f"Exception processing message: {e}")
|
||||
|
||||
async def _handle_client_ready(self, request_id: str):
|
||||
self._client_ready = True
|
||||
self._client_ready_id = request_id
|
||||
await self._maybe_send_bot_ready()
|
||||
await self.set_client_ready()
|
||||
|
||||
async def _handle_describe_config(self, request_id: str):
|
||||
services = list(self._registered_services.values())
|
||||
@@ -841,16 +832,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
|
||||
await self._push_transport_message(message)
|
||||
|
||||
async def _maybe_send_bot_ready(self):
|
||||
if self._pipeline_started and self._client_ready:
|
||||
await self._update_config(self._config, False)
|
||||
await self._send_bot_ready()
|
||||
await self._call_event_handler("on_bot_ready")
|
||||
|
||||
async def _send_bot_ready(self):
|
||||
if not self._params.send_bot_ready:
|
||||
return
|
||||
|
||||
message = RTVIBotReady(
|
||||
id=self._client_ready_id,
|
||||
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=self._config.config),
|
||||
|
||||
@@ -205,7 +205,7 @@ class TTSService(AIService):
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
# TTS output sample rate
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -284,11 +284,7 @@ class TTSService(AIService):
|
||||
logger.warning(f"Unknown setting for TTS service: {key}")
|
||||
|
||||
async def say(self, text: str):
|
||||
aggregate_sentences = self._aggregate_sentences
|
||||
self._aggregate_sentences = False
|
||||
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
|
||||
self._aggregate_sentences = aggregate_sentences
|
||||
await self.flush_audio()
|
||||
await self.queue_frame(TTSSpeakFrame(text))
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -395,7 +391,6 @@ class WordTTSService(TTSService):
|
||||
|
||||
def reset_word_timestamps(self):
|
||||
self._initial_word_timestamp = -1
|
||||
self._word_timestamps = []
|
||||
|
||||
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
|
||||
for word, timestamp in word_times:
|
||||
@@ -430,7 +425,10 @@ class WordTTSService(TTSService):
|
||||
while True:
|
||||
try:
|
||||
(word, timestamp) = await self._words_queue.get()
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
if word == "Reset" and timestamp == 0:
|
||||
self.reset_word_timestamps()
|
||||
frame = None
|
||||
elif word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
frame = LLMFullResponseEndFrame()
|
||||
frame.pts = last_pts
|
||||
elif word == "TTSStoppedFrame" and timestamp == 0:
|
||||
@@ -439,8 +437,9 @@ class WordTTSService(TTSService):
|
||||
else:
|
||||
frame = TextFrame(word)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
last_pts = frame.pts
|
||||
await self.push_frame(frame)
|
||||
if frame:
|
||||
last_pts = frame.pts
|
||||
await self.push_frame(frame)
|
||||
self._words_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
@@ -514,7 +513,7 @@ class SegmentedSTTService(STTService):
|
||||
min_volume: float = 0.6,
|
||||
max_silence_secs: float = 0.3,
|
||||
max_buffer_secs: float = 1.5,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
num_channels: int = 1,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
@@ -671,6 +671,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
):
|
||||
self._function_call_in_progress = None
|
||||
self._function_call_result = frame
|
||||
await self._push_aggregation()
|
||||
else:
|
||||
logger.warning(
|
||||
"FunctionCallResultFrame tool_call_id != InProgressFrame tool_call_id"
|
||||
@@ -679,9 +680,12 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
self._function_call_result = None
|
||||
elif isinstance(frame, AnthropicImageMessageFrame):
|
||||
self._pending_image_frame_message = frame
|
||||
await self._push_aggregation()
|
||||
|
||||
async def _push_aggregation(self):
|
||||
if not self._aggregation:
|
||||
if not (
|
||||
self._aggregation or self._function_call_result or self._pending_image_frame_message
|
||||
):
|
||||
return
|
||||
|
||||
run_llm = False
|
||||
@@ -694,20 +698,18 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
frame = self._function_call_result
|
||||
self._function_call_result = None
|
||||
if frame.result:
|
||||
self._context.add_message(
|
||||
assistant_message = {"role": "assistant", "content": []}
|
||||
if aggregation:
|
||||
assistant_message["content"].append({"type": "text", "text": aggregation})
|
||||
assistant_message["content"].append(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "text", "text": aggregation},
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": frame.tool_call_id,
|
||||
"name": frame.function_name,
|
||||
"input": frame.arguments,
|
||||
},
|
||||
],
|
||||
"type": "tool_use",
|
||||
"id": frame.tool_call_id,
|
||||
"name": frame.function_name,
|
||||
"input": frame.arguments,
|
||||
}
|
||||
)
|
||||
self._context.add_message(assistant_message)
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "user",
|
||||
@@ -721,7 +723,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
}
|
||||
)
|
||||
run_llm = True
|
||||
else:
|
||||
elif aggregation:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
|
||||
@@ -53,8 +53,6 @@ class AssemblyAISTTService(STTService):
|
||||
async def set_language(self, language: Language):
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._settings["language"] = language
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
@@ -4,11 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -45,7 +48,7 @@ class AWSTTSService(TTSService):
|
||||
aws_access_key_id: str,
|
||||
region: str,
|
||||
voice_id: str = "Joanna",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -164,6 +167,14 @@ class AWSTTSService(TTSService):
|
||||
return ssml
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
def read_audio_data(**args):
|
||||
response = self._polly_client.synthesize_speech(**args)
|
||||
if "AudioStream" in response:
|
||||
audio_data = response["AudioStream"].read()
|
||||
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
|
||||
return resampled
|
||||
return None
|
||||
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
@@ -178,28 +189,31 @@ class AWSTTSService(TTSService):
|
||||
"OutputFormat": "pcm",
|
||||
"VoiceId": self._voice_id,
|
||||
"Engine": self._settings["engine"],
|
||||
"SampleRate": str(self._settings["sample_rate"]),
|
||||
# AWS only supports 8000 and 16000 for PCM. We select 16000.
|
||||
"SampleRate": "16000",
|
||||
}
|
||||
|
||||
# Filter out None values
|
||||
filtered_params = {k: v for k, v in params.items() if v is not None}
|
||||
|
||||
response = self._polly_client.synthesize_speech(**filtered_params)
|
||||
audio_data = await asyncio.to_thread(read_audio_data, **filtered_params)
|
||||
|
||||
if not audio_data:
|
||||
logger.error(f"{self} No audio data returned")
|
||||
yield None
|
||||
return
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
if "AudioStream" in response:
|
||||
with response["AudioStream"] as stream:
|
||||
audio_data = stream.read()
|
||||
chunk_size = 8192
|
||||
for i in range(0, len(audio_data), chunk_size):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield frame
|
||||
chunk_size = 8192
|
||||
for i in range(0, len(audio_data), chunk_size):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
|
||||
@@ -25,8 +25,14 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.ai_services import ImageGenService, STTService, TTSService
|
||||
from pipecat.services.openai import BaseOpenAILLMService
|
||||
from pipecat.services.openai import (
|
||||
BaseOpenAILLMService,
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIContextAggregatorPair,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
@@ -35,8 +41,10 @@ try:
|
||||
from azure.cognitiveservices.speech import (
|
||||
CancellationReason,
|
||||
ResultReason,
|
||||
ServicePropertyChannel,
|
||||
SpeechConfig,
|
||||
SpeechRecognizer,
|
||||
SpeechSynthesisOutputFormat,
|
||||
SpeechSynthesizer,
|
||||
)
|
||||
from azure.cognitiveservices.speech.audio import (
|
||||
@@ -70,8 +78,35 @@ class AzureLLMService(BaseOpenAILLMService):
|
||||
api_version=self._api_version,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def create_context_aggregator(
|
||||
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
|
||||
) -> OpenAIContextAggregatorPair:
|
||||
user = OpenAIUserContextAggregator(context)
|
||||
assistant = OpenAIAssistantContextAggregator(
|
||||
user, expect_stripped_words=assistant_expect_stripped_words
|
||||
)
|
||||
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
class AzureTTSService(TTSService):
|
||||
|
||||
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
|
||||
match sample_rate:
|
||||
case 8000:
|
||||
return SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm
|
||||
case 16000:
|
||||
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
|
||||
case 22050:
|
||||
return SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm
|
||||
case 24000:
|
||||
return SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm
|
||||
case 44100:
|
||||
return SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm
|
||||
case 48000:
|
||||
return SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm
|
||||
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
|
||||
|
||||
|
||||
class AzureBaseTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
emphasis: Optional[str] = None
|
||||
language: Optional[Language] = Language.EN_US
|
||||
@@ -88,15 +123,12 @@ class AzureTTSService(TTSService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
voice="en-US-SaraNeural",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"emphasis": params.emphasis,
|
||||
@@ -111,7 +143,10 @@ class AzureTTSService(TTSService):
|
||||
"volume": params.volume,
|
||||
}
|
||||
|
||||
self.set_voice(voice)
|
||||
self._api_key = api_key
|
||||
self._region = region
|
||||
self._voice_id = voice
|
||||
self._speech_synthesizer = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
@@ -249,6 +284,97 @@ class AzureTTSService(TTSService):
|
||||
|
||||
return ssml
|
||||
|
||||
|
||||
class AzureTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
)
|
||||
speech_config.set_service_property(
|
||||
"synthesizer.synthesis.connection.synthesisConnectionImpl",
|
||||
"websocket",
|
||||
ServicePropertyChannel.UriQueryParameter,
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
# Set up event handlers
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
|
||||
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
|
||||
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
|
||||
|
||||
def _handle_synthesizing(self, evt):
|
||||
"""Handle audio chunks as they arrive"""
|
||||
if evt.result and evt.result.audio_data:
|
||||
self._audio_queue.put_nowait(evt.result.audio_data)
|
||||
|
||||
def _handle_completed(self, evt):
|
||||
"""Handle synthesis completion"""
|
||||
self._audio_queue.put_nowait(None) # Signal completion
|
||||
|
||||
def _handle_canceled(self, evt):
|
||||
"""Handle synthesis cancellation"""
|
||||
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
|
||||
self._audio_queue.put_nowait(None)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
yield TTSStartedFrame()
|
||||
|
||||
ssml = self._construct_ssml(text)
|
||||
|
||||
# Start synthesis
|
||||
self._speech_synthesizer.speak_ssml_async(ssml)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
# Stream audio chunks as they arrive
|
||||
while True:
|
||||
chunk = await self._audio_queue.get()
|
||||
if chunk is None: # End of stream
|
||||
break
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
yield TTSAudioRawFrame(
|
||||
audio=chunk,
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
yield ErrorFrame(f"{self} error: {str(e)}")
|
||||
|
||||
|
||||
class AzureHttpTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
@@ -256,7 +382,7 @@ class AzureTTSService(TTSService):
|
||||
|
||||
ssml = self._construct_ssml(text)
|
||||
|
||||
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))
|
||||
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml)
|
||||
|
||||
if result.reason == ResultReason.SynthesizingAudioCompleted:
|
||||
await self.start_tts_usage_metrics(text)
|
||||
@@ -283,7 +409,7 @@ class AzureSTTService(STTService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
language=Language.EN_US,
|
||||
sample_rate=16000,
|
||||
sample_rate=24000,
|
||||
channels=1,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
@@ -14,13 +14,16 @@ from loguru import logger
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
@@ -68,9 +71,6 @@ def language_to_cartesia_language(language: Language) -> str | None:
|
||||
|
||||
class CartesiaTTSService(WordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
encoding: Optional[str] = "pcm_s16le"
|
||||
sample_rate: Optional[int] = 16000
|
||||
container: Optional[str] = "raw"
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[Union[str, float]] = ""
|
||||
emotion: Optional[List[str]] = []
|
||||
@@ -83,6 +83,9 @@ class CartesiaTTSService(WordTTSService):
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model: str = "sonic-english",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -99,7 +102,7 @@ class CartesiaTTSService(WordTTSService):
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
sample_rate=params.sample_rate,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -108,9 +111,9 @@ class CartesiaTTSService(WordTTSService):
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"output_format": {
|
||||
"container": params.container,
|
||||
"encoding": params.encoding,
|
||||
"sample_rate": params.sample_rate,
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -225,14 +228,13 @@ class CartesiaTTSService(WordTTSService):
|
||||
if not msg or msg["context_id"] != self._context_id:
|
||||
continue
|
||||
if msg["type"] == "done":
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_ttfb_metrics()
|
||||
# Unset _context_id but not the _context_id_start_timestamp
|
||||
# because we are likely still playing out audio and need the
|
||||
# timestamp to set send context frames.
|
||||
self._context_id = None
|
||||
await self.add_word_timestamps(
|
||||
[("TTSStoppedFrame", 0), ("LLMFullResponseEndFrame", 0)]
|
||||
)
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
|
||||
elif msg["type"] == "timestamps":
|
||||
await self.add_word_timestamps(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
|
||||
@@ -258,6 +260,19 @@ class CartesiaTTSService(WordTTSService):
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# If we received a TTSSpeakFrame and the LLM response included text (it
|
||||
# might be that it's only a function calling response) we pause
|
||||
# processing more frames until we receive a BotStoppedSpeakingFrame.
|
||||
if isinstance(frame, TTSSpeakFrame):
|
||||
await self.pause_processing_frames()
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) and self._context_id:
|
||||
await self.pause_processing_frames()
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self.resume_processing_frames()
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
@@ -288,9 +303,6 @@ class CartesiaTTSService(WordTTSService):
|
||||
|
||||
class CartesiaHttpTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
encoding: Optional[str] = "pcm_s16le"
|
||||
sample_rate: Optional[int] = 16000
|
||||
container: Optional[str] = "raw"
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[Union[str, float]] = ""
|
||||
emotion: Optional[List[str]] = []
|
||||
@@ -302,17 +314,20 @@ class CartesiaHttpTTSService(TTSService):
|
||||
voice_id: str,
|
||||
model: str = "sonic-english",
|
||||
base_url: str = "https://api.cartesia.ai",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._settings = {
|
||||
"output_format": {
|
||||
"container": params.container,
|
||||
"encoding": params.encoding,
|
||||
"sample_rate": params.sample_rate,
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user