Compare commits
96 Commits
khk/load-j
...
khk/anthro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0265c1d3ef | ||
|
|
ffa0e5a122 | ||
|
|
cdeab597b3 | ||
|
|
abd486025b | ||
|
|
c4cdb2d809 | ||
|
|
05ba10c969 | ||
|
|
2f80683dc4 | ||
|
|
151242d3a0 | ||
|
|
93c6e5098c | ||
|
|
84bd767312 | ||
|
|
802c29e9e1 | ||
|
|
f83381860c | ||
|
|
4dad1bfe49 | ||
|
|
9ee8896b64 | ||
|
|
5f7a2f66d4 | ||
|
|
76e5f1e847 | ||
|
|
6975340d6c | ||
|
|
0f4cf56418 | ||
|
|
018e51e8a3 | ||
|
|
b050143952 | ||
|
|
98ea1f0791 | ||
|
|
8272c35527 | ||
|
|
e973e82e05 | ||
|
|
d1396bf618 | ||
|
|
8186e423de | ||
|
|
3010addb8b | ||
|
|
029e0d391e | ||
|
|
bf31223577 | ||
|
|
42cc79154f | ||
|
|
05b857006a | ||
|
|
2e57d21b89 | ||
|
|
fa05ec46be | ||
|
|
e3ce619284 | ||
|
|
fb512dcd74 | ||
|
|
ca15d97383 | ||
|
|
b32448e967 | ||
|
|
7e30da6183 | ||
|
|
a6dd2600d2 | ||
|
|
b905b57dfc | ||
|
|
e1a7edfb58 | ||
|
|
1b30b1fc23 | ||
|
|
55026898f6 | ||
|
|
4283557894 | ||
|
|
5ab00e01aa | ||
|
|
fcfc729e83 | ||
|
|
4eacb34fd8 | ||
|
|
3a8aacccf7 | ||
|
|
54c0bf0c70 | ||
|
|
778b05a252 | ||
|
|
f16a416c2b | ||
|
|
1be63bccb8 | ||
|
|
37820ac0df | ||
|
|
8ea80d43f4 | ||
|
|
e117d70a00 | ||
|
|
2ba753272a | ||
|
|
60c8c2f6e9 | ||
|
|
cfb48200c2 | ||
|
|
6d317c6e8e | ||
|
|
158d52856f | ||
|
|
92a69e404f | ||
|
|
d24c6185d8 | ||
|
|
1fd21578a6 | ||
|
|
700db87127 | ||
|
|
6f1310569c | ||
|
|
14cedb0be8 | ||
|
|
fae97f9051 | ||
|
|
d930a46e64 | ||
|
|
2e6b5d1843 | ||
|
|
88362db034 | ||
|
|
f7f0c44c32 | ||
|
|
33553b71d4 | ||
|
|
be8ca505cd | ||
|
|
e957cce422 | ||
|
|
418a13a4ec | ||
|
|
fc445c0a1f | ||
|
|
f0c65468ed | ||
|
|
ce6a2bdcf7 | ||
|
|
673542e235 | ||
|
|
e032b0b70a | ||
|
|
e39f7e965b | ||
|
|
d26751e968 | ||
|
|
e0ca4a9c23 | ||
|
|
801e52c095 | ||
|
|
a46eaa838b | ||
|
|
7c432499db | ||
|
|
8d75fcc9f0 | ||
|
|
61d73f81ae | ||
|
|
951255def9 | ||
|
|
bf5a7c3562 | ||
|
|
e556f34094 | ||
|
|
ccc3691620 | ||
|
|
5321affda7 | ||
|
|
e5ad8dc67b | ||
|
|
46927805bc | ||
|
|
07712cdb16 | ||
|
|
b999b76f70 |
2
.github/workflows/format.yaml
vendored
2
.github/workflows/format.yaml
vendored
@@ -38,4 +38,4 @@ jobs:
|
||||
id: ruff
|
||||
run: |
|
||||
source .venv/bin/activate
|
||||
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
|
||||
ruff format --diff
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -4,6 +4,7 @@ __pycache__/
|
||||
*~
|
||||
venv
|
||||
.venv
|
||||
/.idea
|
||||
#*#
|
||||
|
||||
# Distribution / packaging
|
||||
|
||||
67
CHANGELOG.md
67
CHANGELOG.md
@@ -9,14 +9,79 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
|
||||
received OpenAI LLM context frame and it doesn't let it through until the
|
||||
notifier is notified.
|
||||
|
||||
- Added `WakeNotifierFilter`. This processor expects a list of frame types and
|
||||
will execute a given callback predicate when a frame of any of those type is
|
||||
being processed. If the callback returns true the notifier will be notified.
|
||||
|
||||
- Added `NullFilter`. A null filter doesn't push any frames upstream or
|
||||
downstream. This is usually used to disable one of the pipelines in
|
||||
`ParallelPipeline`.
|
||||
|
||||
- Added `EventNotifier`. This can be used as a very simple synchronization
|
||||
feature between processors.
|
||||
|
||||
- Added `TavusVideoService`. This is an integration for Tavus digital twins.
|
||||
(see https://www.tavus.io/)
|
||||
|
||||
- Added `DailyTransport.update_subscriptions()`. This allows you to have fine
|
||||
grained control of what media subscriptions you want for each participant in a
|
||||
room.
|
||||
|
||||
### Changed
|
||||
|
||||
- The following `DailyTransport` functions are now `async` which means they need
|
||||
to be awaited: `start_dialout`, `stop_dialout`, `start_recording`,
|
||||
`stop_recording`, `capture_participant_transcription` and
|
||||
`capture_participant_video`.
|
||||
|
||||
- Changed default output sample rate to 24000. This changes all TTS service to
|
||||
output to 24000 and also the default output transport sample rate. This
|
||||
improves audio quality at the cost of some extra bandwidth.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Improved bot speaking detection for all TTS services by using actual bot
|
||||
audio.
|
||||
|
||||
- Fixed an issue that was generating constant bot started/stopped speaking
|
||||
frames for HTTP TTS services.
|
||||
|
||||
- Fixed an issue that was causing stuttering with AWS TTS service.
|
||||
|
||||
- Fixed an issue with PlayHTTTSService, where the TTFB metrics were reporting
|
||||
very small time values.
|
||||
|
||||
### Other
|
||||
|
||||
- Added a new foundational example 22-natural-conversation.py. This examples
|
||||
shows how to achieve a more natural conversation detecting when the user ends
|
||||
statement.
|
||||
|
||||
## [0.0.47] - 2024-10-22
|
||||
|
||||
### Added
|
||||
|
||||
- Added `AssemblyAISTTService` and corresponding foundational examples
|
||||
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
|
||||
|
||||
- Added a foundational example for Gladia transcription:
|
||||
`13c-gladia-transcription.py`
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated `GladiaSTTService` to use the V2 API.
|
||||
|
||||
- Changed `DailyTransport` transcription model to `nova-2-general`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue that would cause an import error when importing
|
||||
`SileroVADAnalyzer` from the old package `pipecat.vad.silero`.
|
||||
|
||||
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
|
||||
from `enable_metrics`.
|
||||
|
||||
@@ -32,6 +97,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed `DeepgramSTTService` model to `nova-2-general`.
|
||||
|
||||
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
|
||||
|
||||
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has
|
||||
|
||||
@@ -64,7 +64,7 @@ async def main():
|
||||
# Use Daily as a real-time media transport (WebRTC)
|
||||
transport = DailyTransport(
|
||||
room_url=...,
|
||||
token=...,
|
||||
token="", # leave empty. Note: token is _not_ your api key
|
||||
bot_name="Bot Name",
|
||||
params=DailyParams(audio_out_enabled=True))
|
||||
|
||||
@@ -178,7 +178,7 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
|
||||
:ensure t
|
||||
:hook ((python-mode . lazy-ruff-mode))
|
||||
:config
|
||||
(setq lazy-ruff-format-command "ruff format --config line-length=100")
|
||||
(setq lazy-ruff-format-command "ruff format")
|
||||
(setq lazy-ruff-only-format-block t)
|
||||
(setq lazy-ruff-only-format-region t)
|
||||
(setq lazy-ruff-only-format-buffer t))
|
||||
@@ -197,14 +197,13 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
|
||||
### Visual Studio Code
|
||||
|
||||
Install the
|
||||
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
|
||||
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
|
||||
|
||||
```json
|
||||
"[python]": {
|
||||
"editor.defaultFormatter": "charliermarsh.ruff",
|
||||
"editor.formatOnSave": true
|
||||
},
|
||||
"ruff.format.args": ["--config", "line-length=100"]
|
||||
}
|
||||
```
|
||||
|
||||
## Getting help
|
||||
|
||||
165
docs/CONTRIBUTING.md
Normal file
165
docs/CONTRIBUTING.md
Normal file
@@ -0,0 +1,165 @@
|
||||
## Contributing to Pipecat
|
||||
|
||||
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
|
||||
|
||||
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
|
||||
|
||||
2. **Clone the repository**: Clone your forked repository to your local machine.
|
||||
```bash
|
||||
git clone https://github.com/your-username/pipecat
|
||||
```
|
||||
3. **Create a branch**: For your contribution, create a new branch.
|
||||
```bash
|
||||
git checkout -b your-branch-name
|
||||
```
|
||||
4. **Make your changes**: Edit or add files as necessary.
|
||||
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
|
||||
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
|
||||
|
||||
```bash
|
||||
git commit -m "Description of your changes"
|
||||
```
|
||||
|
||||
7. **Push your changes**: Push your branch to your forked repository.
|
||||
|
||||
```bash
|
||||
git push origin your-branch-name
|
||||
```
|
||||
|
||||
9. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
|
||||
> Important: Describe the changes you've made clearly!
|
||||
|
||||
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
|
||||
|
||||
|
||||
# Contributor Covenant Code of Conduct
|
||||
|
||||
## Our Pledge
|
||||
|
||||
We as members, contributors, and leaders pledge to make participation in our
|
||||
community a harassment-free experience for everyone, regardless of age, body
|
||||
size, visible or invisible disability, ethnicity, sex characteristics, gender
|
||||
identity and expression, level of experience, education, socio-economic status,
|
||||
nationality, personal appearance, race, caste, color, religion, or sexual
|
||||
identity and orientation.
|
||||
|
||||
We pledge to act and interact in ways that contribute to an open, welcoming,
|
||||
diverse, inclusive, and healthy community.
|
||||
|
||||
## Our Standards
|
||||
|
||||
Examples of behavior that contributes to a positive environment for our
|
||||
community include:
|
||||
|
||||
* Demonstrating empathy and kindness toward other people
|
||||
* Being respectful of differing opinions, viewpoints, and experiences
|
||||
* Giving and gracefully accepting constructive feedback
|
||||
* Accepting responsibility and apologizing to those affected by our mistakes,
|
||||
and learning from the experience
|
||||
* Focusing on what is best not just for us as individuals, but for the overall
|
||||
community
|
||||
|
||||
Examples of unacceptable behavior include:
|
||||
|
||||
* The use of sexualized language or imagery, and sexual attention or advances of
|
||||
any kind
|
||||
* Trolling, insulting or derogatory comments, and personal or political attacks
|
||||
* Public or private harassment
|
||||
* Publishing others' private information, such as a physical or email address,
|
||||
without their explicit permission
|
||||
* Other conduct which could reasonably be considered inappropriate in a
|
||||
professional setting
|
||||
|
||||
## Enforcement Responsibilities
|
||||
|
||||
Community leaders are responsible for clarifying and enforcing our standards of
|
||||
acceptable behavior and will take appropriate and fair corrective action in
|
||||
response to any behavior that they deem inappropriate, threatening, offensive,
|
||||
or harmful.
|
||||
|
||||
Community leaders have the right and responsibility to remove, edit, or reject
|
||||
comments, commits, code, wiki edits, issues, and other contributions that are
|
||||
not aligned to this Code of Conduct, and will communicate reasons for moderation
|
||||
decisions when appropriate.
|
||||
|
||||
## Scope
|
||||
|
||||
This Code of Conduct applies within all community spaces, and also applies when
|
||||
an individual is officially representing the community in public spaces.
|
||||
Examples of representing our community include using an official email address,
|
||||
posting via an official social media account, or acting as an appointed
|
||||
representative at an online or offline event.
|
||||
|
||||
## Enforcement
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be
|
||||
reported to the community leaders responsible for enforcement at pipecat-ai@daily.co.
|
||||
All complaints will be reviewed and investigated promptly and fairly.
|
||||
|
||||
All community leaders are obligated to respect the privacy and security of the
|
||||
reporter of any incident.
|
||||
|
||||
## Enforcement Guidelines
|
||||
|
||||
Community leaders will follow these Community Impact Guidelines in determining
|
||||
the consequences for any action they deem in violation of this Code of Conduct:
|
||||
|
||||
### 1. Correction
|
||||
|
||||
**Community Impact**: Use of inappropriate language or other behavior deemed
|
||||
unprofessional or unwelcome in the community.
|
||||
|
||||
**Consequence**: A private, written warning from community leaders, providing
|
||||
clarity around the nature of the violation and an explanation of why the
|
||||
behavior was inappropriate. A public apology may be requested.
|
||||
|
||||
### 2. Warning
|
||||
|
||||
**Community Impact**: A violation through a single incident or series of
|
||||
actions.
|
||||
|
||||
**Consequence**: A warning with consequences for continued behavior. No
|
||||
interaction with the people involved, including unsolicited interaction with
|
||||
those enforcing the Code of Conduct, for a specified period of time. This
|
||||
includes avoiding interactions in community spaces as well as external channels
|
||||
like social media. Violating these terms may lead to a temporary or permanent
|
||||
ban.
|
||||
|
||||
### 3. Temporary Ban
|
||||
|
||||
**Community Impact**: A serious violation of community standards, including
|
||||
sustained inappropriate behavior.
|
||||
|
||||
**Consequence**: A temporary ban from any sort of interaction or public
|
||||
communication with the community for a specified period of time. No public or
|
||||
private interaction with the people involved, including unsolicited interaction
|
||||
with those enforcing the Code of Conduct, is allowed during this period.
|
||||
Violating these terms may lead to a permanent ban.
|
||||
|
||||
### 4. Permanent Ban
|
||||
|
||||
**Community Impact**: Demonstrating a pattern of violation of community
|
||||
standards, including sustained inappropriate behavior, harassment of an
|
||||
individual, or aggression toward or disparagement of classes of individuals.
|
||||
|
||||
**Consequence**: A permanent ban from any sort of public interaction within the
|
||||
community.
|
||||
|
||||
## Attribution
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
|
||||
version 2.1, available at
|
||||
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
|
||||
|
||||
Community Impact Guidelines were inspired by
|
||||
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
|
||||
|
||||
For answers to common questions about this code of conduct, see the FAQ at
|
||||
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
|
||||
[https://www.contributor-covenant.org/translations][translations].
|
||||
|
||||
[homepage]: https://www.contributor-covenant.org
|
||||
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
|
||||
[Mozilla CoC]: https://github.com/mozilla/diversity
|
||||
[FAQ]: https://www.contributor-covenant.org/faq
|
||||
[translations]: https://www.contributor-covenant.org/translations
|
||||
22
docs/ISSUE_TEMPLATE.md
Normal file
22
docs/ISSUE_TEMPLATE.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Description
|
||||
Is this reporting a bug or feature request?
|
||||
|
||||
|
||||
If reporting a bug, please fill out the following:
|
||||
|
||||
### Environment
|
||||
- pipecat-ai version:
|
||||
- python version:
|
||||
- OS:
|
||||
|
||||
### Issue description
|
||||
Provide a clear description of the issue.
|
||||
|
||||
### Repro steps
|
||||
List the steps to reproduce the issue.
|
||||
|
||||
### Expected behavior
|
||||
|
||||
### Actual behavior
|
||||
|
||||
### Logs
|
||||
1
docs/PULL_REQUEST_TEMPLATE.md
Normal file
1
docs/PULL_REQUEST_TEMPLATE.md
Normal file
@@ -0,0 +1 @@
|
||||
#### Please describe the changes in your PR. If it is addressing an issue, please reference that as well.
|
||||
@@ -46,5 +46,10 @@ PLAY_HT_API_KEY=...
|
||||
# OpenAI
|
||||
OPENAI_API_KEY=...
|
||||
|
||||
#OpenPipe
|
||||
# OpenPipe
|
||||
OPENPIPE_API_KEY=...
|
||||
|
||||
# Tavus
|
||||
TAVUS_API_KEY=...
|
||||
TAVUS_REPLICA_ID=...
|
||||
TAVUS_PERSONA_ID=...
|
||||
@@ -1,12 +1,41 @@
|
||||
# Simple Chatbot
|
||||
# Chatbot with canonical-metrics
|
||||
|
||||
<img src="image.png" width="420px">
|
||||
This project implements a chatbot using a pipeline architecture that integrates audio processing, transcription, and a language model for conversational interactions. The chatbot operates within a daily communication environment, utilizing various services for text-to-speech and language model responses.
|
||||
|
||||
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
|
||||
## Features
|
||||
|
||||
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
|
||||
- **Audio Input and Output**: Captures microphone input and plays back audio responses.
|
||||
- **Voice Activity Detection**: Utilizes Silero VAD to manage audio input intelligently.
|
||||
- **Text-to-Speech**: Integrates ElevenLabs TTS service to convert text responses into audio.
|
||||
- **Language Model Interaction**: Uses OpenAI's GPT-4 model to generate responses based on user input.
|
||||
- **Transcription Services**: Captures and transcribes participant speech for analytics.
|
||||
- **Metrics Collection**: Sends audio data for analysis via Canonical Metrics Service.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.10+
|
||||
- `python-dotenv`
|
||||
- Additional libraries from the `pipecat` package.
|
||||
|
||||
## Setup
|
||||
|
||||
1. Clone the repository.
|
||||
2. Install the required packages.
|
||||
3. Set up environment variables for API keys:
|
||||
- `OPENAI_API_KEY`
|
||||
- `ELEVENLABS_API_KEY`
|
||||
- `CANONICAL_API_KEY`
|
||||
- `CANONICAL_API_URL`
|
||||
4. Run the script.
|
||||
|
||||
## Usage
|
||||
|
||||
The chatbot introduces itself and engages in conversations, providing brief and creative responses. Designed for flexibility, it can support multiple languages with appropriate configuration.
|
||||
|
||||
## Events
|
||||
|
||||
- Participants joining or leaving the call are handled dynamically, adjusting the chatbot's behavior accordingly.
|
||||
|
||||
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
|
||||
|
||||
ℹ️ The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -123,7 +123,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -75,7 +75,7 @@ async def main(room_url: str, token: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -84,7 +84,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
url=url,
|
||||
token=token,
|
||||
room_name=room_name,
|
||||
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
|
||||
params=LiveKitParams(audio_out_enabled=True),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
|
||||
@@ -5,33 +5,31 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
|
||||
from pipecat.metrics.metrics import (
|
||||
TTFBMetricsData,
|
||||
ProcessingMetricsData,
|
||||
LLMUsageMetricsData,
|
||||
ProcessingMetricsData,
|
||||
TTFBMetricsData,
|
||||
TTSUsageMetricsData,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
@@ -105,11 +103,14 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -127,7 +127,7 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
participant_name = participant.get("info", {}).get("userName", "")
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
@@ -89,7 +89,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -87,7 +87,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
lc.set_participant_id(participant["id"])
|
||||
# Kick off the conversation.
|
||||
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
|
||||
|
||||
@@ -85,7 +85,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -40,7 +40,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
@@ -89,7 +88,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -41,7 +41,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
@@ -90,7 +89,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -74,7 +74,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -86,7 +86,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -5,12 +5,16 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -20,12 +24,6 @@ from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
@@ -85,11 +83,16 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
# Register an event handler to exit the application when the user leaves.
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -77,7 +77,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -96,7 +96,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=16000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
@@ -85,7 +84,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -83,7 +83,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -63,6 +63,7 @@ async def main():
|
||||
"Test",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
@@ -73,7 +74,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_video(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"])
|
||||
|
||||
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ async def main():
|
||||
tk_root.title("Local Mirror")
|
||||
|
||||
daily_transport = DailyTransport(
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True)
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
|
||||
)
|
||||
|
||||
tk_transport = TkLocalTransport(
|
||||
@@ -81,7 +81,7 @@ async def main():
|
||||
|
||||
@daily_transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_video(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"])
|
||||
|
||||
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await tts.say("Hi! If you want to talk to me, just say 'Hey Robot'.")
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
@@ -134,7 +134,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await tts.say("Hi, I'm listening!")
|
||||
await transport.send_audio(sounds["ding1.wav"])
|
||||
|
||||
|
||||
@@ -84,8 +84,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -86,8 +86,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -83,8 +83,8 @@ async def main():
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -78,16 +78,13 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
params=CartesiaTTSService.InputParams(
|
||||
sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
image_requester.set_participant_id(participant["id"])
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -127,7 +127,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -160,8 +160,8 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
transport.capture_participant_transcription(video_participant_id)
|
||||
transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
await transport.capture_participant_transcription(video_participant_id)
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
# await tts.say("Hi! Ask me about the weather in San Francisco.")
|
||||
|
||||
|
||||
@@ -153,8 +153,8 @@ indicate you should use the get_image tool are:
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await tts.say("Hi! Ask me about the weather in San Francisco.")
|
||||
|
||||
|
||||
173
examples/foundational/14e-function-calling-gemini.py
Normal file
173
examples/foundational/14e-function-calling-gemini.py
Normal file
@@ -0,0 +1,173 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.services.openai import OpenAILLMContext
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
video_participant_id = None
|
||||
|
||||
|
||||
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
location = arguments["location"]
|
||||
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
|
||||
|
||||
|
||||
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
|
||||
question = arguments["question"]
|
||||
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
llm.register_function("get_weather", get_weather)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "get_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_image",
|
||||
"description": "Get and image from the camera or video stream.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question to to use when running inference on the acquired image.",
|
||||
},
|
||||
},
|
||||
"required": ["question"],
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
system_prompt = """\
|
||||
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
|
||||
|
||||
Your response will be turned into speech so use only simple words and punctuation.
|
||||
|
||||
You have access to two tools: get_weather and get_image.
|
||||
|
||||
You can respond to questions about the weather using the get_weather tool.
|
||||
|
||||
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
|
||||
indicate you should use the get_image tool are:
|
||||
- What do you see?
|
||||
- What's in the video?
|
||||
- Can you describe the video?
|
||||
- Tell me about what you see.
|
||||
- Tell me something interesting about what you see.
|
||||
- What's happening in the video?
|
||||
"""
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": "Say hello."},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -141,7 +141,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{
|
||||
|
||||
@@ -10,7 +10,7 @@ import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import LLMMessagesFrame, TTSUpdateSettingsFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.function_filter import FunctionFilter
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.whisper import Model, WhisperSTTService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
@@ -61,16 +60,14 @@ async def main():
|
||||
token,
|
||||
"Pipecat",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = WhisperSTTService(model=Model.LARGE)
|
||||
|
||||
english_tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
@@ -116,7 +113,6 @@ async def main():
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
ParallelPipeline( # TTS (bot will speak the chosen language)
|
||||
@@ -132,7 +128,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{
|
||||
|
||||
@@ -92,7 +92,7 @@ async def main():
|
||||
# bot can "hear" and respond to them.
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
|
||||
# When the first participant joins, the bot should introduce itself.
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
|
||||
@@ -99,7 +99,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@@ -166,7 +166,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -249,7 +249,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -219,7 +219,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
290
examples/foundational/20d-persistent-context-gemini.py
Normal file
290
examples/foundational/20d-persistent-context-gemini.py
Normal file
@@ -0,0 +1,290 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
video_participant_id = None
|
||||
|
||||
|
||||
BASE_FILENAME = "/tmp/pipecat_conversation_"
|
||||
tts = None
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
temperature = 75 if args["format"] == "fahrenheit" else 24
|
||||
await result_callback(
|
||||
{
|
||||
"conditions": "nice",
|
||||
"temperature": temperature,
|
||||
"format": args["format"],
|
||||
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
question = arguments["question"]
|
||||
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
|
||||
|
||||
|
||||
async def get_saved_conversation_filenames(
|
||||
function_name, tool_call_id, args, llm, context, result_callback
|
||||
):
|
||||
# Construct the full pattern including the BASE_FILENAME
|
||||
full_pattern = f"{BASE_FILENAME}*.json"
|
||||
|
||||
# Use glob to find all matching files
|
||||
matching_files = glob.glob(full_pattern)
|
||||
logger.debug(f"matching files: {matching_files}")
|
||||
|
||||
await result_callback({"filenames": matching_files})
|
||||
|
||||
|
||||
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
|
||||
filename = f"{BASE_FILENAME}{timestamp}.json"
|
||||
logger.debug(
|
||||
f"writing conversation to {filename}\n{json.dumps(context.get_messages_for_logging(), indent=4)}"
|
||||
)
|
||||
try:
|
||||
with open(filename, "w") as file:
|
||||
# todo: extract 'system' into the first message in the list
|
||||
messages = context.get_messages_for_persistent_storage()
|
||||
# remove the last message (the instruction to save the context)
|
||||
messages.pop()
|
||||
json.dump(messages, file, indent=2)
|
||||
await result_callback({"success": True})
|
||||
except Exception as e:
|
||||
logger.debug(f"error saving conversation: {e}")
|
||||
await result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
global tts
|
||||
filename = args["filename"]
|
||||
logger.debug(f"loading conversation from {filename}")
|
||||
try:
|
||||
with open(filename, "r") as file:
|
||||
context.set_messages(json.load(file))
|
||||
await result_callback(
|
||||
{
|
||||
"success": True,
|
||||
"message": "The most recent conversation has been loaded. Awaiting further instructions.",
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
await result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
# Test message munging ...
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your
|
||||
capabilities in a succinct way. Your output will be converted to audio so don't include special
|
||||
characters in your answers. Respond to what the user said in a creative and helpful way.
|
||||
|
||||
You have several tools you can use to help you.
|
||||
|
||||
You can respond to questions about the weather using the get_weather tool.
|
||||
|
||||
You can save the current conversation using the save_conversation tool. This tool allows you to save
|
||||
the current conversation to external storage. If the user asks you to save the conversation, use this
|
||||
save_conversation too.
|
||||
|
||||
You can load a saved conversation using the load_conversation tool. This tool allows you to load a
|
||||
conversation from external storage. You can get a list of conversations that have been saved using the
|
||||
get_saved_conversation_filenames tool.
|
||||
|
||||
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
|
||||
indicate you should use the get_image tool are:
|
||||
- What do you see?
|
||||
- What's in the video?
|
||||
- Can you describe the video?
|
||||
- Tell me about what you see.
|
||||
- Tell me something interesting about what you see.
|
||||
- What's happening in the video?
|
||||
""",
|
||||
},
|
||||
# {"role": "user", "content": ""},
|
||||
# {"role": "assistant", "content": []},
|
||||
# {"role": "user", "content": "Tell me"},
|
||||
# {"role": "user", "content": "a joke"},
|
||||
]
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "save_conversation",
|
||||
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"user_request_text": {
|
||||
"type": "string",
|
||||
"description": "The text of the user's request to save the conversation.",
|
||||
}
|
||||
},
|
||||
"required": ["user_request_text"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_saved_conversation_filenames",
|
||||
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
|
||||
"parameters": None,
|
||||
},
|
||||
{
|
||||
"name": "load_conversation",
|
||||
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filename": {
|
||||
"type": "string",
|
||||
"description": "The filename of the conversation history to load.",
|
||||
}
|
||||
},
|
||||
"required": ["filename"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "get_image",
|
||||
"description": "Get and image from the camera or video stream.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question to to use when running inference on the acquired image.",
|
||||
},
|
||||
},
|
||||
"required": ["question"],
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
async def main():
|
||||
global tts
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
llm.register_function("save_conversation", save_conversation)
|
||||
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
|
||||
llm.register_function("load_conversation", load_conversation)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
tts,
|
||||
context_aggregator.assistant(),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
# report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(video_participant_id, framerate=0)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
133
examples/foundational/21-tavus-layer.py
Normal file
133
examples/foundational/21-tavus-layer.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from typing import Any, Mapping
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.tavus import TavusVideoService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tavus = TavusVideoService(
|
||||
api_key=os.getenv("TAVUS_API_KEY"),
|
||||
replica_id=os.getenv("TAVUS_REPLICA_ID"),
|
||||
persona_id=os.getenv("TAVUS_PERSONA_ID", "pipecat0"),
|
||||
session=session,
|
||||
)
|
||||
|
||||
# get persona, look up persona_name, set this as the bot name to ignore
|
||||
persona_name = await tavus.get_persona_name()
|
||||
room_url = await tavus.initialize()
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url=room_url,
|
||||
token=None,
|
||||
bot_name="Pipecat bot",
|
||||
params=DailyParams(
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(model="gpt-4o-mini")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tavus, # Tavus output layer
|
||||
transport.output(), # Transport bot output
|
||||
tma_out, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(
|
||||
transport: DailyTransport, participant: Mapping[str, Any]
|
||||
) -> None:
|
||||
# Ignore the Tavus replica's microphone
|
||||
if participant.get("info", {}).get("userName", "") == persona_name:
|
||||
logger.debug(f"Ignoring {participant['id']}'s microphone")
|
||||
await transport.update_subscriptions(
|
||||
participant_settings={
|
||||
participant["id"]: {
|
||||
"media": {"microphone": "unsubscribed"},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if participant.get("info", {}).get("userName", "") != persona_name:
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
168
examples/foundational/22-natural-conversation.py
Normal file
168
examples/foundational/22-natural-conversation.py
Normal file
@@ -0,0 +1,168 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.null_filter import NullFilter
|
||||
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.sync.event_notifier import EventNotifier
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
# This is the LLM that will be used to detect if the user has finished a
|
||||
# statement. This doesn't really need to be an LLM, we could use NLP
|
||||
# libraries for that, but it was easier as an example because we
|
||||
# leverage the context aggregators.
|
||||
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
statement_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Determine if the user's statement is a complete sentence or question, ending in a natural pause or punctuation. Return 'YES' if it is complete and 'NO' if it seems to leave a thought unfinished.",
|
||||
},
|
||||
]
|
||||
|
||||
statement_context = OpenAILLMContext(statement_messages)
|
||||
statement_context_aggregator = statement_llm.create_context_aggregator(statement_context)
|
||||
|
||||
# This is the regular LLM.
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'YES' if it thinks the user
|
||||
# completed a sentence. So, if it's 'YES' we will return true in this
|
||||
# predicate which will wake up the notifier.
|
||||
async def wake_check_filter(frame):
|
||||
return frame.text == "YES"
|
||||
|
||||
# This is a notifier that we use to synchronize the two LLMs.
|
||||
notifier = EventNotifier()
|
||||
|
||||
# This a filter that will wake up the notifier if the given predicate
|
||||
# (wake_check_filter) returns true.
|
||||
completness_check = WakeNotifierFilter(
|
||||
notifier, types=(TextFrame,), filter=wake_check_filter
|
||||
)
|
||||
|
||||
# This processor keeps the last context and will let it through once the
|
||||
# notifier is woken up.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
|
||||
|
||||
# Notify if the user hasn't said anything.
|
||||
async def user_idle_notifier(frame):
|
||||
await notifier.notify()
|
||||
|
||||
# Sometimes the LLM will fail detecting if a user has completed a
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
|
||||
|
||||
# The ParallePipeline input are the user transcripts. We have two
|
||||
# contexts. The first one will be used to determine if the user finished
|
||||
# a statement and if so the notifier will be woken up. The second
|
||||
# context is simply the regular context but it's gated waiting for the
|
||||
# notifier to be woken up.
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
ParallelPipeline(
|
||||
[
|
||||
statement_context_aggregator.user(),
|
||||
statement_llm,
|
||||
completness_check,
|
||||
NullFilter(),
|
||||
],
|
||||
[context_aggregator.user(), gated_context_aggregator, llm],
|
||||
),
|
||||
user_idle,
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
298
examples/foundational/99-anthropic-hackathon.py
Normal file
298
examples/foundational/99-anthropic-hackathon.py
Normal file
@@ -0,0 +1,298 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
from collections import deque
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
BotInterruptionFrame,
|
||||
Frame,
|
||||
ImageRawFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMMessagesFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import (
|
||||
RTVIBotTranscriptionProcessor,
|
||||
RTVIUserTranscriptionProcessor,
|
||||
)
|
||||
from pipecat.services.anthropic import AnthropicLLMContext, AnthropicLLMService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
MAX_FRAMES = 5
|
||||
FRAMES_PER_SECOND = 0.2
|
||||
|
||||
|
||||
video_participant_id = None
|
||||
anthropic_context = None
|
||||
recent_image_frames = deque(maxlen=MAX_FRAMES)
|
||||
most_recent_image_summary = ""
|
||||
|
||||
|
||||
class ImageFrameCatcher(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
global recent_image_frames
|
||||
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, ImageRawFrame):
|
||||
recent_image_frames.append(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class TranscriptFrameCatcher(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.debug(
|
||||
f"TranscriptLogger: {frame}, num frames: {len(recent_image_frames)}, anthropic context: {anthropic_context}"
|
||||
)
|
||||
if anthropic_context:
|
||||
add_message_with_images(
|
||||
anthropic_context, frame.text, frames=list(recent_image_frames)
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class MessageFrameCatcher(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
last_message = frame.context.messages[-1]
|
||||
|
||||
system_message = """
|
||||
Give me a concise summary of the images supplied.
|
||||
"""
|
||||
frame = LLMMessagesFrame(
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_message,
|
||||
},
|
||||
last_message,
|
||||
],
|
||||
)
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
|
||||
class MessageFrameCatcher2(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.text_blob = ""
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
global most_recent_image_summary
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TextFrame):
|
||||
self.text_blob += f" {frame.text}"
|
||||
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
logger.debug(f"MessageFrameCatcher2: {self.text_blob}")
|
||||
most_recent_image_summary = self.text_blob
|
||||
self.text_blob = ""
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def main():
|
||||
global llm
|
||||
global anthropic_context
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
model="claude-3-5-sonnet-20240620",
|
||||
enable_prompt_caching_beta=True,
|
||||
)
|
||||
|
||||
vision_llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
model="claude-3-5-sonnet-20240620",
|
||||
enable_prompt_caching_beta=True,
|
||||
)
|
||||
|
||||
# todo: test with very short initial user message
|
||||
|
||||
system_prompt = """\
|
||||
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions. Keep
|
||||
your answers brief unless explicitly asked for more information.
|
||||
|
||||
Your response will be turned into speech so use only simple words and punctuation.
|
||||
"""
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": system_prompt,
|
||||
}
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "Start the conversation by saying 'hello'."},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
anthropic_context = AnthropicLLMContext.upgrade_to_anthropic(context)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi_user_transcription = RTVIUserTranscriptionProcessor()
|
||||
rtvi_bot_transcription = RTVIBotTranscriptionProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
ImageFrameCatcher(),
|
||||
TranscriptFrameCatcher(),
|
||||
rtvi_user_transcription,
|
||||
context_aggregator.user(), # User speech to text
|
||||
ParallelPipeline(
|
||||
[
|
||||
llm, # LLM
|
||||
rtvi_bot_transcription,
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses and tool context
|
||||
],
|
||||
[MessageFrameCatcher(), vision_llm, MessageFrameCatcher2()],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
global video_participant_id
|
||||
video_participant_id = participant["id"]
|
||||
await transport.capture_participant_transcription(video_participant_id)
|
||||
await transport.capture_participant_video(
|
||||
video_participant_id, framerate=FRAMES_PER_SECOND, video_source="screenVideo"
|
||||
)
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_app_message")
|
||||
async def on_app_message(transport, message, sender):
|
||||
logger.debug(f"Received app message: {message} - {context}")
|
||||
|
||||
if not recent_image_frames:
|
||||
logger.debug("No image frames to send")
|
||||
return
|
||||
|
||||
add_message_with_images(
|
||||
anthropic_context, message["message"], frames=list(recent_image_frames)
|
||||
)
|
||||
|
||||
interrupt_message = "STOP"
|
||||
|
||||
if interrupt_message == message["message"]:
|
||||
logger.debug("Interrupting")
|
||||
await task.queue_frames([BotInterruptionFrame()])
|
||||
else:
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
def add_message_with_images(c, message, frames=None):
|
||||
if frames is None:
|
||||
frames = list(recent_image_frames)
|
||||
|
||||
if not frames:
|
||||
logger.debug("No image frames to send")
|
||||
return
|
||||
|
||||
# Create content list starting with all images
|
||||
content = []
|
||||
for frame in frames:
|
||||
buffer = io.BytesIO()
|
||||
Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
|
||||
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||||
|
||||
content.append(
|
||||
{
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "base64",
|
||||
"media_type": "image/jpeg",
|
||||
"data": encoded_image,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Add text message at the end if provided
|
||||
if message:
|
||||
content.append({"type": "text", "text": message})
|
||||
|
||||
# Go through all messages and replace user messages containing images
|
||||
if c.messages:
|
||||
for i, msg in enumerate(c.messages):
|
||||
if (
|
||||
msg["role"] == "user"
|
||||
and isinstance(msg["content"], list)
|
||||
and len(msg["content"]) > 0
|
||||
):
|
||||
if msg["content"][0].get("type") == "image":
|
||||
logger.debug(
|
||||
f"Replacing user message {i} containing images with summary: {most_recent_image_summary}"
|
||||
)
|
||||
c.messages[i] = {"role": "user", "content": most_recent_image_summary}
|
||||
|
||||
c.add_message({"role": "user", "content": content})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -203,8 +203,8 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
transport.capture_participant_video(participant["id"], framerate=0)
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_video(participant["id"], framerate=0)
|
||||
ir.set_participant_id(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
|
||||
@@ -352,7 +352,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
print(f"Context is: {context}")
|
||||
await task.queue_frames([OpenAILLMContextFrame(context)])
|
||||
|
||||
|
||||
@@ -17,6 +17,10 @@ from fastapi.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
|
||||
@@ -102,7 +102,7 @@ async def main(room_url, token=None):
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
logger.debug("Participant joined, storytime commence!")
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await intro_task.queue_frames(
|
||||
[
|
||||
images["book1"],
|
||||
|
||||
@@ -165,7 +165,7 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
|
||||
@@ -121,7 +121,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
|
||||
@@ -21,14 +21,14 @@ classifiers = [
|
||||
]
|
||||
dependencies = [
|
||||
"aiohttp~=3.10.3",
|
||||
"loguru~=0.7.2",
|
||||
"Markdown~=3.7",
|
||||
"numpy~=1.26.4",
|
||||
"loguru~=0.7.2",
|
||||
"Pillow~=10.4.0",
|
||||
"protobuf~=4.25.4",
|
||||
"pydantic~=2.8.2",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"scipy~=1.14.1",
|
||||
"resampy~=0.4.3",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -42,13 +42,13 @@ aws = [ "boto3~=1.35.27" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
|
||||
canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.0.13", "websockets~=13.1" ]
|
||||
daily = [ "daily-python~=0.11.0" ]
|
||||
daily = [ "daily-python~=0.12.0" ]
|
||||
deepgram = [ "deepgram-sdk~=3.7.3" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=13.1" ]
|
||||
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
|
||||
google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ]
|
||||
gstreamer = [ "pygobject~=3.48.2" ]
|
||||
fireworks = [ "openai~=1.37.2" ]
|
||||
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
|
||||
@@ -74,3 +74,7 @@ pythonpath = ["src"]
|
||||
[tool.setuptools_scm]
|
||||
local_scheme = "no-local-version"
|
||||
fallback_version = "0.0.0-dev"
|
||||
|
||||
[tool.ruff]
|
||||
exclude = ["*_pb2.py"]
|
||||
line-length = 100
|
||||
|
||||
@@ -7,13 +7,14 @@
|
||||
import audioop
|
||||
import numpy as np
|
||||
import pyloudnorm as pyln
|
||||
from scipy import signal
|
||||
import resampy
|
||||
|
||||
|
||||
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
|
||||
if original_rate == target_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
num_samples = int(len(audio) * target_rate / original_rate)
|
||||
resampled_audio = signal.resample(audio_data, num_samples)
|
||||
resampled_audio = resampy.resample(audio_data, original_rate, target_rate)
|
||||
return resampled_audio.astype(np.int16).tobytes()
|
||||
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ class SileroOnnxModel:
|
||||
|
||||
if sr not in self.sample_rates:
|
||||
raise ValueError(
|
||||
f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)"
|
||||
f"Supported sampling rates: {self.sample_rates} (or multiple of 16000)"
|
||||
)
|
||||
if sr / np.shape(x)[1] > 31.25:
|
||||
raise ValueError("Input audio chunk is too short")
|
||||
|
||||
@@ -12,6 +12,11 @@ from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
|
||||
|
||||
VAD_CONFIDENCE = 0.7
|
||||
VAD_START_SECS = 0.2
|
||||
VAD_STOP_SECS = 0.8
|
||||
VAD_MIN_VOLUME = 0.6
|
||||
|
||||
|
||||
class VADState(Enum):
|
||||
QUIET = 1
|
||||
@@ -21,10 +26,10 @@ class VADState(Enum):
|
||||
|
||||
|
||||
class VADParams(BaseModel):
|
||||
confidence: float = 0.7
|
||||
start_secs: float = 0.2
|
||||
stop_secs: float = 0.8
|
||||
min_volume: float = 0.6
|
||||
confidence: float = VAD_CONFIDENCE
|
||||
start_secs: float = VAD_START_SECS
|
||||
stop_secs: float = VAD_STOP_SECS
|
||||
min_volume: float = VAD_MIN_VOLUME
|
||||
|
||||
|
||||
class VADAnalyzer:
|
||||
@@ -41,13 +46,17 @@ class VADAnalyzer:
|
||||
self._prev_volume = 0
|
||||
|
||||
@property
|
||||
def sample_rate(self):
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def num_channels(self):
|
||||
def num_channels(self) -> int:
|
||||
return self._num_channels
|
||||
|
||||
@property
|
||||
def params(self) -> VADParams:
|
||||
return self._params
|
||||
|
||||
@abstractmethod
|
||||
def num_frames_required(self) -> int:
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
|
||||
|
||||
class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
"""This aggregator keeps the last received OpenAI LLM context frame and it
|
||||
doesn't let it through until the notifier is notified.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._last_context_frame = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.push_frame(frame)
|
||||
await self._start()
|
||||
if isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._stop()
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
self._last_context_frame = frame
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _start(self):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await self._notifier.wait()
|
||||
if self._last_context_frame:
|
||||
await self.push_frame(self._last_context_frame)
|
||||
self._last_context_frame = None
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
@@ -70,6 +70,8 @@ class OpenAILLMContext:
|
||||
context.add_message(message)
|
||||
return context
|
||||
|
||||
# todo: deprecate from_image_frame. It's only used to create a single-use
|
||||
# context, which isn't useful for most real-world applications.
|
||||
@staticmethod
|
||||
def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext":
|
||||
"""
|
||||
@@ -77,6 +79,10 @@ class OpenAILLMContext:
|
||||
expects images to be base64 encoded, but other vision models may not.
|
||||
So we'll store the image as bytes and do the base64 encoding as needed
|
||||
in the LLM service.
|
||||
|
||||
NOTE: the above only applies to the deprecated use of this method. The
|
||||
add_image_frame_message() below does the base64 encoding as expected
|
||||
in the OpenAI format.
|
||||
"""
|
||||
context = OpenAILLMContext()
|
||||
buffer = io.BytesIO()
|
||||
|
||||
@@ -4,14 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import List
|
||||
from typing import Tuple, Type
|
||||
|
||||
from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class FrameFilter(FrameProcessor):
|
||||
def __init__(self, types: List[type]):
|
||||
def __init__(self, types: Tuple[Type[Frame]]):
|
||||
super().__init__()
|
||||
self._types = types
|
||||
|
||||
@@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor):
|
||||
#
|
||||
|
||||
def _should_passthrough_frame(self, frame):
|
||||
for t in self._types:
|
||||
if isinstance(frame, t):
|
||||
return True
|
||||
if isinstance(frame, self._types):
|
||||
return True
|
||||
|
||||
return (
|
||||
isinstance(frame, AppFrame)
|
||||
|
||||
14
src/pipecat/processors/filters/null_filter.py
Normal file
14
src/pipecat/processors/filters/null_filter.py
Normal file
@@ -0,0 +1,14 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
|
||||
|
||||
class NullFilter(FrameProcessor):
|
||||
"""This filter doesn't allow passing any frames up or downstream."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
40
src/pipecat/processors/filters/wake_notifier_filter.py
Normal file
40
src/pipecat/processors/filters/wake_notifier_filter.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Awaitable, Callable, Tuple, Type
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
|
||||
|
||||
class WakeNotifierFilter(FrameProcessor):
|
||||
"""This processor expects a list of frame types and will execute a given
|
||||
callback predicate when a frame of any of those type is being processed. If
|
||||
the callback returns true the notifier will be notified.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
notifier: BaseNotifier,
|
||||
*,
|
||||
types: Tuple[Type[Frame]],
|
||||
filter: Callable[[Frame], Awaitable[bool]],
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._types = types
|
||||
self._filter = filter
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, self._types) and await self._filter(frame):
|
||||
await self._notifier.notify()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -205,7 +205,7 @@ class TTSService(AIService):
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
# TTS output sample rate
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -514,7 +514,7 @@ class SegmentedSTTService(STTService):
|
||||
min_volume: float = 0.6,
|
||||
max_silence_secs: float = 0.3,
|
||||
max_buffer_secs: float = 1.5,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
num_channels: int = 1,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
@@ -53,8 +53,6 @@ class AssemblyAISTTService(STTService):
|
||||
async def set_language(self, language: Language):
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._settings["language"] = language
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
@@ -4,11 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -45,7 +48,7 @@ class AWSTTSService(TTSService):
|
||||
aws_access_key_id: str,
|
||||
region: str,
|
||||
voice_id: str = "Joanna",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -164,6 +167,14 @@ class AWSTTSService(TTSService):
|
||||
return ssml
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
def read_audio_data(**args):
|
||||
response = self._polly_client.synthesize_speech(**args)
|
||||
if "AudioStream" in response:
|
||||
audio_data = response["AudioStream"].read()
|
||||
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
|
||||
return resampled
|
||||
return None
|
||||
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
@@ -178,28 +189,31 @@ class AWSTTSService(TTSService):
|
||||
"OutputFormat": "pcm",
|
||||
"VoiceId": self._voice_id,
|
||||
"Engine": self._settings["engine"],
|
||||
"SampleRate": str(self._settings["sample_rate"]),
|
||||
# AWS only supports 8000 and 16000 for PCM. We select 16000.
|
||||
"SampleRate": "16000",
|
||||
}
|
||||
|
||||
# Filter out None values
|
||||
filtered_params = {k: v for k, v in params.items() if v is not None}
|
||||
|
||||
response = self._polly_client.synthesize_speech(**filtered_params)
|
||||
audio_data = await asyncio.to_thread(read_audio_data, **filtered_params)
|
||||
|
||||
if not audio_data:
|
||||
logger.error(f"{self} No audio data returned")
|
||||
yield None
|
||||
return
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
if "AudioStream" in response:
|
||||
with response["AudioStream"] as stream:
|
||||
audio_data = stream.read()
|
||||
chunk_size = 8192
|
||||
for i in range(0, len(audio_data), chunk_size):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield frame
|
||||
chunk_size = 8192
|
||||
for i in range(0, len(audio_data), chunk_size):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
|
||||
@@ -25,8 +25,14 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.ai_services import ImageGenService, STTService, TTSService
|
||||
from pipecat.services.openai import BaseOpenAILLMService
|
||||
from pipecat.services.openai import (
|
||||
BaseOpenAILLMService,
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIContextAggregatorPair,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
@@ -38,6 +44,7 @@ try:
|
||||
SpeechConfig,
|
||||
SpeechRecognizer,
|
||||
SpeechSynthesizer,
|
||||
SpeechSynthesisOutputFormat,
|
||||
)
|
||||
from azure.cognitiveservices.speech.audio import (
|
||||
AudioStreamFormat,
|
||||
@@ -70,6 +77,33 @@ class AzureLLMService(BaseOpenAILLMService):
|
||||
api_version=self._api_version,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def create_context_aggregator(
|
||||
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
|
||||
) -> OpenAIContextAggregatorPair:
|
||||
user = OpenAIUserContextAggregator(context)
|
||||
assistant = OpenAIAssistantContextAggregator(
|
||||
user, expect_stripped_words=assistant_expect_stripped_words
|
||||
)
|
||||
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
|
||||
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
|
||||
match sample_rate:
|
||||
case 8000:
|
||||
return SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm
|
||||
case 16000:
|
||||
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
|
||||
case 22050:
|
||||
return SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm
|
||||
case 24000:
|
||||
return SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm
|
||||
case 44100:
|
||||
return SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm
|
||||
case 48000:
|
||||
return SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm
|
||||
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
|
||||
|
||||
|
||||
class AzureTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
@@ -88,13 +122,15 @@ class AzureTTSService(TTSService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
voice="en-US-SaraNeural",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
speech_config.set_speech_synthesis_output_format(sample_rate_to_output_format(sample_rate))
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
self._settings = {
|
||||
@@ -283,7 +319,7 @@ class AzureSTTService(STTService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
language=Language.EN_US,
|
||||
sample_rate=16000,
|
||||
sample_rate=24000,
|
||||
channels=1,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
@@ -68,9 +68,6 @@ def language_to_cartesia_language(language: Language) -> str | None:
|
||||
|
||||
class CartesiaTTSService(WordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
encoding: Optional[str] = "pcm_s16le"
|
||||
sample_rate: Optional[int] = 16000
|
||||
container: Optional[str] = "raw"
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[Union[str, float]] = ""
|
||||
emotion: Optional[List[str]] = []
|
||||
@@ -83,6 +80,9 @@ class CartesiaTTSService(WordTTSService):
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model: str = "sonic-english",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -99,7 +99,7 @@ class CartesiaTTSService(WordTTSService):
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
sample_rate=params.sample_rate,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -108,9 +108,9 @@ class CartesiaTTSService(WordTTSService):
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"output_format": {
|
||||
"container": params.container,
|
||||
"encoding": params.encoding,
|
||||
"sample_rate": params.sample_rate,
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -288,9 +288,6 @@ class CartesiaTTSService(WordTTSService):
|
||||
|
||||
class CartesiaHttpTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
encoding: Optional[str] = "pcm_s16le"
|
||||
sample_rate: Optional[int] = 16000
|
||||
container: Optional[str] = "raw"
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[Union[str, float]] = ""
|
||||
emotion: Optional[List[str]] = []
|
||||
@@ -302,17 +299,20 @@ class CartesiaHttpTTSService(TTSService):
|
||||
voice_id: str,
|
||||
model: str = "sonic-english",
|
||||
base_url: str = "https://api.cartesia.ai",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._settings = {
|
||||
"output_format": {
|
||||
"container": params.container,
|
||||
"encoding": params.encoding,
|
||||
"sample_rate": params.sample_rate,
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
|
||||
@@ -51,11 +51,11 @@ class DeepgramTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice: str = "aura-helios-en",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "linear16",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
|
||||
@@ -36,6 +36,8 @@ except ModuleNotFoundError as e:
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]
|
||||
|
||||
|
||||
def sample_rate_from_output_format(output_format: str) -> int:
|
||||
match output_format:
|
||||
@@ -74,7 +76,6 @@ def calculate_word_times(
|
||||
class ElevenLabsTTSService(WordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN
|
||||
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
|
||||
optimize_streaming_latency: Optional[str] = None
|
||||
stability: Optional[float] = None
|
||||
similarity_boost: Optional[float] = None
|
||||
@@ -98,6 +99,7 @@ class ElevenLabsTTSService(WordTTSService):
|
||||
voice_id: str,
|
||||
model: str = "eleven_turbo_v2_5",
|
||||
url: str = "wss://api.elevenlabs.io",
|
||||
output_format: ElevenLabsOutputFormat = "pcm_24000",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -120,18 +122,18 @@ class ElevenLabsTTSService(WordTTSService):
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
sample_rate=sample_rate_from_output_format(params.output_format),
|
||||
sample_rate=sample_rate_from_output_format(output_format),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate_from_output_format(params.output_format),
|
||||
"sample_rate": sample_rate_from_output_format(output_format),
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else Language.EN,
|
||||
"output_format": params.output_format,
|
||||
"output_format": output_format,
|
||||
"optimize_streaming_latency": params.optimize_streaming_latency,
|
||||
"stability": params.stability,
|
||||
"similarity_boost": params.similarity_boost,
|
||||
|
||||
@@ -22,7 +22,8 @@ class FireworksLLMService(BaseOpenAILLMService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "accounts/fireworks/models/firefunction-v1",
|
||||
base_url: str = "https://api.fireworks.ai/inference/v1",
|
||||
):
|
||||
super().__init__(model=model, base_url=base_url)
|
||||
super().__init__(api_key=api_key, model=model, base_url=base_url)
|
||||
|
||||
@@ -8,6 +8,7 @@ import base64
|
||||
import json
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
@@ -23,7 +24,6 @@ from pipecat.services.ai_services import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
# See .env.example for Gladia configuration needed
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -38,15 +38,16 @@ class GladiaSTTService(STTService):
|
||||
class InputParams(BaseModel):
|
||||
sample_rate: Optional[int] = 16000
|
||||
language: Optional[Language] = Language.EN
|
||||
transcription_hint: Optional[str] = None
|
||||
endpointing: Optional[int] = 200
|
||||
prosody: Optional[bool] = None
|
||||
endpointing: Optional[float] = 0.2
|
||||
maximum_duration_without_endpointing: Optional[int] = 10
|
||||
audio_enhancer: Optional[bool] = None
|
||||
words_accurate_timestamps: Optional[bool] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "wss://api.gladia.io/audio/text/audio-transcription",
|
||||
url: str = "https://api.gladia.io/v2/live",
|
||||
confidence: float = 0.5,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -56,101 +57,82 @@ class GladiaSTTService(STTService):
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"encoding": "wav/pcm",
|
||||
"bit_depth": 16,
|
||||
"sample_rate": params.sample_rate,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else Language.EN,
|
||||
"transcription_hint": params.transcription_hint,
|
||||
"channels": 1,
|
||||
"language_config": {
|
||||
"languages": [self.language_to_service_language(params.language)]
|
||||
if params.language
|
||||
else [],
|
||||
"code_switching": False,
|
||||
},
|
||||
"endpointing": params.endpointing,
|
||||
"prosody": params.prosody,
|
||||
"maximum_duration_without_endpointing": params.maximum_duration_without_endpointing,
|
||||
"pre_processing": {
|
||||
"audio_enhancer": params.audio_enhancer,
|
||||
},
|
||||
"realtime_processing": {
|
||||
"words_accurate_timestamps": params.words_accurate_timestamps,
|
||||
},
|
||||
}
|
||||
self._confidence = confidence
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
match language:
|
||||
case Language.BG:
|
||||
return "bulgarian"
|
||||
case Language.CA:
|
||||
return "catalan"
|
||||
case Language.ZH:
|
||||
return "chinese"
|
||||
case Language.CS:
|
||||
return "czech"
|
||||
case Language.DA:
|
||||
return "danish"
|
||||
case Language.NL:
|
||||
return "dutch"
|
||||
case (
|
||||
Language.EN
|
||||
| Language.EN_US
|
||||
| Language.EN_AU
|
||||
| Language.EN_GB
|
||||
| Language.EN_NZ
|
||||
| Language.EN_IN
|
||||
):
|
||||
return "english"
|
||||
case Language.ET:
|
||||
return "estonian"
|
||||
case Language.FI:
|
||||
return "finnish"
|
||||
case Language.FR | Language.FR_CA:
|
||||
return "french"
|
||||
case Language.DE | Language.DE_CH:
|
||||
return "german"
|
||||
case Language.EL:
|
||||
return "greek"
|
||||
case Language.HI:
|
||||
return "hindi"
|
||||
case Language.HU:
|
||||
return "hungarian"
|
||||
case Language.ID:
|
||||
return "indonesian"
|
||||
case Language.IT:
|
||||
return "italian"
|
||||
case Language.JA:
|
||||
return "japanese"
|
||||
case Language.KO:
|
||||
return "korean"
|
||||
case Language.LV:
|
||||
return "latvian"
|
||||
case Language.LT:
|
||||
return "lithuanian"
|
||||
case Language.MS:
|
||||
return "malay"
|
||||
case Language.NO:
|
||||
return "norwegian"
|
||||
case Language.PL:
|
||||
return "polish"
|
||||
case Language.PT | Language.PT_BR:
|
||||
return "portuguese"
|
||||
case Language.RO:
|
||||
return "romanian"
|
||||
case Language.RU:
|
||||
return "russian"
|
||||
case Language.SK:
|
||||
return "slovak"
|
||||
case Language.ES:
|
||||
return "spanish"
|
||||
case Language.SV:
|
||||
return "slovenian"
|
||||
case Language.TH:
|
||||
return "thai"
|
||||
case Language.TR:
|
||||
return "turkish"
|
||||
case Language.UK:
|
||||
return "ukrainian"
|
||||
case Language.VI:
|
||||
return "vietnamese"
|
||||
return None
|
||||
language_map = {
|
||||
Language.BG: "bg",
|
||||
Language.CA: "ca",
|
||||
Language.ZH: "zh",
|
||||
Language.CS: "cs",
|
||||
Language.DA: "da",
|
||||
Language.NL: "nl",
|
||||
Language.EN: "en",
|
||||
Language.EN_US: "en",
|
||||
Language.EN_AU: "en",
|
||||
Language.EN_GB: "en",
|
||||
Language.EN_NZ: "en",
|
||||
Language.EN_IN: "en",
|
||||
Language.ET: "et",
|
||||
Language.FI: "fi",
|
||||
Language.FR: "fr",
|
||||
Language.FR_CA: "fr",
|
||||
Language.DE: "de",
|
||||
Language.DE_CH: "de",
|
||||
Language.EL: "el",
|
||||
Language.HI: "hi",
|
||||
Language.HU: "hu",
|
||||
Language.ID: "id",
|
||||
Language.IT: "it",
|
||||
Language.JA: "ja",
|
||||
Language.KO: "ko",
|
||||
Language.LV: "lv",
|
||||
Language.LT: "lt",
|
||||
Language.MS: "ms",
|
||||
Language.NO: "no",
|
||||
Language.PL: "pl",
|
||||
Language.PT: "pt",
|
||||
Language.PT_BR: "pt",
|
||||
Language.RO: "ro",
|
||||
Language.RU: "ru",
|
||||
Language.SK: "sk",
|
||||
Language.ES: "es",
|
||||
Language.SV: "sv",
|
||||
Language.TH: "th",
|
||||
Language.TR: "tr",
|
||||
Language.UK: "uk",
|
||||
Language.VI: "vi",
|
||||
}
|
||||
return language_map.get(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._websocket = await websockets.connect(self._url)
|
||||
response = await self._setup_gladia()
|
||||
self._websocket = await websockets.connect(response["url"])
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
await self._setup_gladia()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._send_stop_recording()
|
||||
await self._websocket.close()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
@@ -164,39 +146,37 @@ class GladiaSTTService(STTService):
|
||||
yield None
|
||||
|
||||
async def _setup_gladia(self):
|
||||
configuration = {
|
||||
"x_gladia_key": self._api_key,
|
||||
"encoding": "WAV/PCM",
|
||||
"model_type": "fast",
|
||||
"language_behaviour": "manual",
|
||||
"sample_rate": self._settings["sample_rate"],
|
||||
"language": self._settings["language"],
|
||||
"transcription_hint": self._settings["transcription_hint"],
|
||||
"endpointing": self._settings["endpointing"],
|
||||
"prosody": self._settings["prosody"],
|
||||
}
|
||||
|
||||
await self._websocket.send(json.dumps(configuration))
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
self._url,
|
||||
headers={"X-Gladia-Key": self._api_key, "Content-Type": "application/json"},
|
||||
json=self._settings,
|
||||
) as response:
|
||||
if response.ok:
|
||||
return await response.json()
|
||||
else:
|
||||
logger.error(
|
||||
f"Gladia error: {response.status}: {response.text or response.reason}"
|
||||
)
|
||||
raise Exception(f"Failed to initialize Gladia session: {response.status}")
|
||||
|
||||
async def _send_audio(self, audio: bytes):
|
||||
message = {"frames": base64.b64encode(audio).decode("utf-8")}
|
||||
data = base64.b64encode(audio).decode("utf-8")
|
||||
message = {"type": "audio_chunk", "data": {"chunk": data}}
|
||||
await self._websocket.send(json.dumps(message))
|
||||
|
||||
async def _send_stop_recording(self):
|
||||
await self._websocket.send(json.dumps({"type": "stop_recording"}))
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
async for message in self._websocket:
|
||||
utterance = json.loads(message)
|
||||
if not utterance:
|
||||
continue
|
||||
|
||||
if "error" in utterance:
|
||||
message = utterance["message"]
|
||||
logger.error(f"Gladia error: {message}")
|
||||
elif "confidence" in utterance:
|
||||
type = utterance["type"]
|
||||
confidence = utterance["confidence"]
|
||||
transcript = utterance["transcription"]
|
||||
content = json.loads(message)
|
||||
if content["type"] == "transcript":
|
||||
utterance = content["data"]["utterance"]
|
||||
confidence = utterance.get("confidence", 0)
|
||||
transcript = utterance["text"]
|
||||
if confidence >= self._confidence:
|
||||
if type == "final":
|
||||
if content["data"]["is_final"]:
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(transcript, "", time_now_iso8601())
|
||||
)
|
||||
|
||||
@@ -5,11 +5,15 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
from typing import AsyncGenerator, List, Literal, Optional
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
from PIL import Image
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
@@ -24,18 +28,24 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService, TTSService
|
||||
from pipecat.services.openai import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIUserContextAggregator,
|
||||
)
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
try:
|
||||
import google.ai.generativelanguage as glm
|
||||
import google.generativeai as gai
|
||||
from google.cloud import texttospeech_v1
|
||||
from google.generativeai.types import GenerationConfig
|
||||
from google.oauth2 import service_account
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
@@ -45,6 +55,249 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class GoogleUserContextAggregator(OpenAIUserContextAggregator):
|
||||
async def _push_aggregation(self):
|
||||
if len(self._aggregation) > 0:
|
||||
self._context.add_message(
|
||||
glm.Content(role="user", parts=[glm.Part(text=self._aggregation)])
|
||||
)
|
||||
|
||||
# Reset the aggregation. Reset it before pushing it down, otherwise
|
||||
# if the tasks gets cancelled we won't be able to clear things up.
|
||||
self._aggregation = ""
|
||||
|
||||
frame = OpenAILLMContextFrame(self._context)
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Reset our accumulator state.
|
||||
self._reset()
|
||||
|
||||
|
||||
class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
async def _push_aggregation(self):
|
||||
if not (
|
||||
self._aggregation or self._function_call_result or self._pending_image_frame_message
|
||||
):
|
||||
return
|
||||
|
||||
run_llm = False
|
||||
|
||||
aggregation = self._aggregation
|
||||
self._reset()
|
||||
|
||||
try:
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
self._function_call_result = None
|
||||
if frame.result:
|
||||
logger.debug(f"FunctionCallResultFrame result: {frame.arguments}")
|
||||
self._context.add_message(
|
||||
glm.Content(
|
||||
role="model",
|
||||
parts=[
|
||||
glm.Part(
|
||||
function_call=glm.FunctionCall(
|
||||
name=frame.function_name, args=frame.arguments
|
||||
)
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
response = frame.result
|
||||
if isinstance(response, str):
|
||||
response = {"response": response}
|
||||
self._context.add_message(
|
||||
glm.Content(
|
||||
role="user",
|
||||
parts=[
|
||||
glm.Part(
|
||||
function_response=glm.FunctionResponse(
|
||||
name=frame.function_name, response=response
|
||||
)
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
run_llm = not bool(self._function_calls_in_progress)
|
||||
else:
|
||||
self._context.add_message(
|
||||
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
|
||||
)
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
frame = self._pending_image_frame_message
|
||||
self._pending_image_frame_message = None
|
||||
self._context.add_image_frame_message(
|
||||
format=frame.user_image_raw_frame.format,
|
||||
size=frame.user_image_raw_frame.size,
|
||||
image=frame.user_image_raw_frame.image,
|
||||
text=frame.text,
|
||||
)
|
||||
run_llm = True
|
||||
|
||||
if run_llm:
|
||||
await self._user_context_aggregator.push_context_frame()
|
||||
|
||||
frame = OpenAILLMContextFrame(self._context)
|
||||
await self.push_frame(frame)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error processing frame: {e}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class GoogleContextAggregatorPair:
|
||||
_user: "GoogleUserContextAggregator"
|
||||
_assistant: "GoogleAssistantContextAggregator"
|
||||
|
||||
def user(self) -> "GoogleUserContextAggregator":
|
||||
return self._user
|
||||
|
||||
def assistant(self) -> "GoogleAssistantContextAggregator":
|
||||
return self._assistant
|
||||
|
||||
|
||||
class GoogleLLMContext(OpenAILLMContext):
|
||||
@staticmethod
|
||||
def upgrade_to_google(obj: OpenAILLMContext) -> "GoogleLLMContext":
|
||||
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, GoogleLLMContext):
|
||||
logger.debug(f"Upgrading to Google: {obj}")
|
||||
obj.__class__ = GoogleLLMContext
|
||||
obj._restructure_from_openai_messages()
|
||||
return obj
|
||||
|
||||
def set_messages(self, messages: List):
|
||||
self._messages[:] = messages
|
||||
self._restructure_from_openai_messages()
|
||||
|
||||
def get_messages_for_logging(self):
|
||||
msgs = []
|
||||
for message in self.messages:
|
||||
obj = glm.Content.to_dict(message)
|
||||
try:
|
||||
if "parts" in obj:
|
||||
for part in obj["parts"]:
|
||||
if "inline_data" in part:
|
||||
part["inline_data"]["data"] = "..."
|
||||
except Exception as e:
|
||||
logger.debug(f"Error: {e}")
|
||||
msgs.append(obj)
|
||||
return msgs
|
||||
|
||||
def from_standard_message(self, message):
|
||||
role = message["role"]
|
||||
content = message.get("content", [])
|
||||
if role == "system":
|
||||
role = "user"
|
||||
elif role == "assistant":
|
||||
role = "model"
|
||||
|
||||
parts = []
|
||||
if message.get("tool_calls"):
|
||||
for tc in message["tool_calls"]:
|
||||
parts.append(
|
||||
glm.Part(
|
||||
function_call=glm.FunctionCall(
|
||||
name=tc["function"]["name"],
|
||||
args=json.loads(tc["function"]["arguments"]),
|
||||
)
|
||||
)
|
||||
)
|
||||
elif role == "tool":
|
||||
role = "model"
|
||||
parts.append(
|
||||
glm.Part(
|
||||
function_response=glm.FunctionResponse(
|
||||
name="tool_call_result", # seems to work to hard-code the same name every time
|
||||
response=json.loads(message["content"]),
|
||||
)
|
||||
)
|
||||
)
|
||||
elif isinstance(content, str):
|
||||
parts.append(glm.Part(text=content))
|
||||
elif isinstance(content, list):
|
||||
for c in content:
|
||||
if c["type"] == "text":
|
||||
parts.append(glm.Part(text=c["text"]))
|
||||
elif c["type"] == "image_url":
|
||||
parts.append(
|
||||
glm.Part(
|
||||
inline_data=glm.Blob(
|
||||
mime_type="image/jpeg",
|
||||
data=base64.b64decode(c["image_url"]["url"].split(",")[1]),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
message = glm.Content(role=role, parts=parts)
|
||||
return message
|
||||
|
||||
def add_image_frame_message(
|
||||
self, *, format: str, size: tuple[int, int], image: bytes, text: str = None
|
||||
):
|
||||
buffer = io.BytesIO()
|
||||
Image.frombytes(format, size, image).save(buffer, format="JPEG")
|
||||
|
||||
parts = []
|
||||
if text:
|
||||
parts.append(glm.Part(text=text))
|
||||
parts.append(
|
||||
glm.Part(inline_data=glm.Blob(mime_type="image/jpeg", data=buffer.getvalue())),
|
||||
)
|
||||
self.add_message(glm.Content(role="user", parts=parts))
|
||||
|
||||
def to_standard_messages(self, obj) -> list:
|
||||
msg = {"role": obj.role, "content": []}
|
||||
if msg["role"] == "model":
|
||||
msg["role"] = "assistant"
|
||||
|
||||
for part in obj.parts:
|
||||
if part.text:
|
||||
msg["content"].append({"type": "text", "text": part.text})
|
||||
elif part.inline_data:
|
||||
encoded = base64.b64encode(part.inline_data.data).decode("utf-8")
|
||||
msg["content"].append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:{part.inline_data.mime_type};base64,{encoded}"},
|
||||
}
|
||||
)
|
||||
elif part.function_call:
|
||||
args = type(part.function_call).to_dict(part.function_call).get("args", {})
|
||||
msg["tool_calls"] = [
|
||||
{
|
||||
"id": part.function_call.name,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": part.function_call.name,
|
||||
"arguments": json.dumps(args),
|
||||
},
|
||||
}
|
||||
]
|
||||
|
||||
elif part.function_response:
|
||||
msg["role"] = "tool"
|
||||
resp = (
|
||||
type(part.function_response).to_dict(part.function_response).get("response", {})
|
||||
)
|
||||
msg["tool_call_id"] = part.function_response.name
|
||||
msg["content"] = json.dumps(resp)
|
||||
|
||||
# there might be no content parts for tool_calls messages
|
||||
if not msg["content"]:
|
||||
del msg["content"]
|
||||
return [msg]
|
||||
|
||||
def _restructure_from_openai_messages(self):
|
||||
# first, map across self._messages calling self.from_standard_message(m) to modify messages in place
|
||||
try:
|
||||
self._messages[:] = [self.from_standard_message(m) for m in self._messages]
|
||||
except Exception as e:
|
||||
logger.error(f"Error mapping messages: {e}")
|
||||
# iterate over messages and remove any messages that have an empty content list
|
||||
self._messages = [m for m in self._messages if m.parts]
|
||||
|
||||
|
||||
class GoogleLLMService(LLMService):
|
||||
"""This class implements inference with Google's AI models
|
||||
|
||||
@@ -53,10 +306,31 @@ class GoogleLLMService(LLMService):
|
||||
franca for all LLM services, so that it is easy to switch between different LLMs.
|
||||
"""
|
||||
|
||||
def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
|
||||
class InputParams(BaseModel):
|
||||
max_tokens: Optional[int] = Field(default=4096, ge=1)
|
||||
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
|
||||
top_k: Optional[int] = Field(default=None, ge=0)
|
||||
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "gemini-1.5-flash-latest",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
gai.configure(api_key=api_key)
|
||||
self._create_client(model)
|
||||
self._settings = {
|
||||
"max_tokens": params.max_tokens,
|
||||
"temperature": params.temperature,
|
||||
"top_k": params.top_k,
|
||||
"top_p": params.top_p,
|
||||
"extra": params.extra if isinstance(params.extra, dict) else {},
|
||||
}
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
@@ -98,20 +372,58 @@ class GoogleLLMService(LLMService):
|
||||
async def _process_context(self, context: OpenAILLMContext):
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
try:
|
||||
logger.debug(f"Generating chat: {context.get_messages_json()}")
|
||||
logger.debug(f"Generating chat: {context.get_messages_for_logging()}")
|
||||
|
||||
messages = self._get_messages_from_openai_context(context)
|
||||
# todo: move this into the new context code structure, convert from openai context one time
|
||||
# todo: add system instructions
|
||||
# messages = self._get_messages_from_openai_context(context)
|
||||
messages = context.messages
|
||||
|
||||
# Filter out None values and create GenerationConfig
|
||||
generation_params = {
|
||||
k: v
|
||||
for k, v in {
|
||||
"temperature": self._settings["temperature"],
|
||||
"top_p": self._settings["top_p"],
|
||||
"top_k": self._settings["top_k"],
|
||||
"max_output_tokens": self._settings["max_tokens"],
|
||||
}.items()
|
||||
if v is not None
|
||||
}
|
||||
|
||||
generation_config = GenerationConfig(**generation_params) if generation_params else None
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
response = self._client.generate_content(messages, stream=True)
|
||||
tools = context.tools if context.tools else []
|
||||
response = self._client.generate_content(
|
||||
contents=messages, tools=tools, stream=True, generation_config=generation_config
|
||||
)
|
||||
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=response.usage_metadata.prompt_token_count,
|
||||
completion_tokens=response.usage_metadata.candidates_token_count,
|
||||
total_tokens=response.usage_metadata.total_token_count,
|
||||
)
|
||||
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
async for chunk in self._async_generator_wrapper(response):
|
||||
# todo: usage
|
||||
try:
|
||||
text = chunk.text
|
||||
await self.push_frame(TextFrame(text))
|
||||
for c in chunk.parts:
|
||||
if c.text:
|
||||
await self.push_frame(TextFrame(c.text))
|
||||
elif c.function_call:
|
||||
args = type(c.function_call).to_dict(c.function_call).get("args", {})
|
||||
await self.call_function(
|
||||
context=context,
|
||||
tool_call_id="what_should_this_be",
|
||||
function_name=c.function_call.name,
|
||||
arguments=args,
|
||||
)
|
||||
except Exception as e:
|
||||
# Google LLMs seem to flag safety issues a lot!
|
||||
if chunk.candidates[0].finish_reason == 3:
|
||||
@@ -132,10 +444,11 @@ class GoogleLLMService(LLMService):
|
||||
context = None
|
||||
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
context: OpenAILLMContext = frame.context
|
||||
context: GoogleLLMContext = GoogleLLMContext.upgrade_to_google(frame.context)
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
context = GoogleLLMContext(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
# todo: fix this
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
@@ -145,6 +458,16 @@ class GoogleLLMService(LLMService):
|
||||
if context:
|
||||
await self._process_context(context)
|
||||
|
||||
@staticmethod
|
||||
def create_context_aggregator(
|
||||
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
|
||||
) -> GoogleContextAggregatorPair:
|
||||
user = GoogleUserContextAggregator(context)
|
||||
assistant = GoogleAssistantContextAggregator(
|
||||
user, expect_stripped_words=assistant_expect_stripped_words
|
||||
)
|
||||
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
|
||||
|
||||
|
||||
class GoogleTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
|
||||
@@ -99,7 +99,12 @@ class BaseOpenAILLMService(LLMService):
|
||||
)
|
||||
seed: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=0)
|
||||
temperature: Optional[float] = Field(default_factory=lambda: NOT_GIVEN, ge=0.0, le=2.0)
|
||||
# Note: top_k is currently not supported by the OpenAI client library,
|
||||
# so top_k is ignore right now.
|
||||
top_k: Optional[int] = Field(default=None, ge=0)
|
||||
top_p: Optional[float] = Field(default_factory=lambda: NOT_GIVEN, ge=0.0, le=1.0)
|
||||
max_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
|
||||
max_completion_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
|
||||
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
def __init__(
|
||||
@@ -118,6 +123,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
"seed": params.seed,
|
||||
"temperature": params.temperature,
|
||||
"top_p": params.top_p,
|
||||
"max_tokens": params.max_tokens,
|
||||
"max_completion_tokens": params.max_completion_tokens,
|
||||
"extra": params.extra if isinstance(params.extra, dict) else {},
|
||||
}
|
||||
self.set_model_name(model)
|
||||
@@ -152,6 +159,8 @@ class BaseOpenAILLMService(LLMService):
|
||||
"seed": self._settings["seed"],
|
||||
"temperature": self._settings["temperature"],
|
||||
"top_p": self._settings["top_p"],
|
||||
"max_tokens": self._settings["max_tokens"],
|
||||
"max_completion_tokens": self._settings["max_completion_tokens"],
|
||||
}
|
||||
|
||||
params.update(self._settings["extra"])
|
||||
@@ -214,6 +223,9 @@ class BaseOpenAILLMService(LLMService):
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
|
||||
if not chunk.choices[0].delta:
|
||||
continue
|
||||
|
||||
if chunk.choices[0].delta.tool_calls:
|
||||
# We're streaming the LLM response to enable the fastest response times.
|
||||
# For text, we just yield each chunk as we receive it and count on consumers
|
||||
@@ -533,6 +545,7 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
self._context.add_message(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "", # content field required for Grok function calling
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": frame.tool_call_id,
|
||||
|
||||
@@ -128,7 +128,9 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
#
|
||||
|
||||
async def _handle_interruption(self):
|
||||
if self._session_properties.turn_detection is None:
|
||||
# None and False are different. Check for False. None means we're using OpenAI's
|
||||
# built-in turn detection defaults.
|
||||
if self._session_properties.turn_detection is False:
|
||||
await self.send_client_event(events.InputAudioBufferClearEvent())
|
||||
await self.send_client_event(events.ResponseCancelEvent())
|
||||
await self._truncate_current_audio_response()
|
||||
@@ -138,11 +140,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
async def _handle_user_started_speaking(self, frame):
|
||||
if self._session_properties.turn_detection is None:
|
||||
await self._handle_interruption()
|
||||
pass
|
||||
|
||||
async def _handle_user_stopped_speaking(self, frame):
|
||||
if self._session_properties.turn_detection is None:
|
||||
# None and False are different. Check for False. None means we're using OpenAI's
|
||||
# built-in turn detection defaults.
|
||||
if self._session_properties.turn_detection is False:
|
||||
await self.send_client_event(events.InputAudioBufferCommitEvent())
|
||||
await self.send_client_event(events.ResponseCreateEvent())
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import asyncio
|
||||
import io
|
||||
import json
|
||||
import struct
|
||||
import uuid
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
import aiohttp
|
||||
@@ -115,7 +116,7 @@ class PlayHTTTSService(TTSService):
|
||||
user_id: str,
|
||||
voice_url: str,
|
||||
voice_engine: str = "PlayHT3.0-mini",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
output_format: str = "wav",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -127,6 +128,7 @@ class PlayHTTTSService(TTSService):
|
||||
self._websocket_url = None
|
||||
self._websocket = None
|
||||
self._receive_task = None
|
||||
self._request_id = None
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
@@ -191,6 +193,7 @@ class PlayHTTTSService(TTSService):
|
||||
await self._receive_task
|
||||
self._receive_task = None
|
||||
|
||||
self._request_id = None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
|
||||
@@ -221,6 +224,7 @@ class PlayHTTTSService(TTSService):
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
await self.stop_all_metrics()
|
||||
self._request_id = None
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
@@ -242,9 +246,10 @@ class PlayHTTTSService(TTSService):
|
||||
logger.debug(f"Received text message: {message}")
|
||||
try:
|
||||
msg = json.loads(message)
|
||||
if "request_id" in msg:
|
||||
if "request_id" in msg and msg["request_id"] == self._request_id:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
header_received = False # Reset for the next audio stream
|
||||
self._request_id = None
|
||||
elif "error" in msg:
|
||||
logger.error(f"{self} error: {msg}")
|
||||
await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}'))
|
||||
@@ -263,8 +268,10 @@ class PlayHTTTSService(TTSService):
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
yield TTSStartedFrame()
|
||||
if not self._request_id:
|
||||
await self.start_ttfb_metrics()
|
||||
yield TTSStartedFrame()
|
||||
self._request_id = str(uuid.uuid4())
|
||||
|
||||
tts_command = {
|
||||
"text": text,
|
||||
@@ -275,6 +282,7 @@ class PlayHTTTSService(TTSService):
|
||||
"language": self._settings["language"],
|
||||
"speed": self._settings["speed"],
|
||||
"seed": self._settings["seed"],
|
||||
"request_id": self._request_id,
|
||||
}
|
||||
|
||||
try:
|
||||
@@ -293,8 +301,6 @@ class PlayHTTTSService(TTSService):
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
yield ErrorFrame(f"{self} error: {str(e)}")
|
||||
finally:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
|
||||
class PlayHTHttpTTSService(TTSService):
|
||||
@@ -310,7 +316,7 @@ class PlayHTHttpTTSService(TTSService):
|
||||
user_id: str,
|
||||
voice_url: str,
|
||||
voice_engine: str = "PlayHT3.0-mini",
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: int = 24000,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
137
src/pipecat/services/tavus.py
Normal file
137
src/pipecat/services/tavus.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
"""This module implements Tavus as a sink transport layer"""
|
||||
|
||||
import aiohttp
|
||||
import base64
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
TTSAudioRawFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
StartInterruptionFrame,
|
||||
EndFrame,
|
||||
CancelFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AIService
|
||||
from pipecat.audio.utils import resample_audio
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class TavusVideoService(AIService):
|
||||
"""Class to send base64 encoded audio to Tavus"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
replica_id: str,
|
||||
persona_id: str = "pipecat0",
|
||||
session: aiohttp.ClientSession,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._api_key = api_key
|
||||
self._replica_id = replica_id
|
||||
self._persona_id = persona_id
|
||||
self._session = session
|
||||
|
||||
self._conversation_id: str
|
||||
|
||||
async def initialize(self) -> str:
|
||||
url = "https://tavusapi.com/v2/conversations"
|
||||
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
|
||||
payload = {
|
||||
"replica_id": self._replica_id,
|
||||
"persona_id": self._persona_id,
|
||||
}
|
||||
async with self._session.post(url, headers=headers, json=payload) as r:
|
||||
r.raise_for_status()
|
||||
response_json = await r.json()
|
||||
|
||||
logger.debug(f"TavusVideoService joined {response_json['conversation_url']}")
|
||||
self._conversation_id = response_json["conversation_id"]
|
||||
return response_json["conversation_url"]
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def get_persona_name(self) -> str:
|
||||
url = f"https://tavusapi.com/v2/personas/{self._persona_id}"
|
||||
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
|
||||
async with self._session.get(url, headers=headers) as r:
|
||||
r.raise_for_status()
|
||||
response_json = await r.json()
|
||||
|
||||
logger.debug(f"TavusVideoService persona grabbed {response_json}")
|
||||
return response_json["persona_name"]
|
||||
|
||||
async def _end_conversation(self) -> None:
|
||||
url = f"https://tavusapi.com/v2/conversations/{self._conversation_id}/end"
|
||||
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
|
||||
async with self._session.post(url, headers=headers) as r:
|
||||
r.raise_for_status()
|
||||
|
||||
async def _encode_audio_and_send(
|
||||
self, audio: bytes, original_sample_rate: int, done: bool
|
||||
) -> None:
|
||||
"""Encodes audio to base64 and sends it to Tavus"""
|
||||
if not done:
|
||||
audio = resample_audio(audio, original_sample_rate, 16000)
|
||||
audio_base64 = base64.b64encode(audio).decode("utf-8")
|
||||
logger.trace(f"TavusVideoService sending {len(audio)} bytes")
|
||||
await self._send_audio_message(audio_base64, done=done)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TTSStartedFrame):
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
self._current_idx_str = str(frame.id)
|
||||
elif isinstance(frame, TTSAudioRawFrame):
|
||||
await self._encode_audio_and_send(frame.audio, frame.sample_rate, done=False)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._encode_audio_and_send(b"\x00", 16000, done=True)
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._send_interrupt_message()
|
||||
elif isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._end_conversation()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _send_interrupt_message(self) -> None:
|
||||
transport_frame = TransportMessageUrgentFrame(
|
||||
message={
|
||||
"message_type": "conversation",
|
||||
"event_type": "conversation.interrupt",
|
||||
"conversation_id": self._conversation_id,
|
||||
}
|
||||
)
|
||||
await self.push_frame(transport_frame)
|
||||
|
||||
async def _send_audio_message(self, audio_base64: str, done: bool) -> None:
|
||||
transport_frame = TransportMessageUrgentFrame(
|
||||
message={
|
||||
"message_type": "conversation",
|
||||
"event_type": "conversation.echo",
|
||||
"conversation_id": self._conversation_id,
|
||||
"properties": {
|
||||
"modality": "audio",
|
||||
"inference_id": self._current_idx_str,
|
||||
"audio": audio_base64,
|
||||
"done": done,
|
||||
},
|
||||
}
|
||||
)
|
||||
await self.push_frame(transport_frame)
|
||||
@@ -4,11 +4,8 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
@@ -27,50 +24,16 @@ except ModuleNotFoundError as e:
|
||||
class TogetherLLMService(OpenAILLMService):
|
||||
"""This class implements inference with Together's Llama 3.1 models"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
frequency_penalty: Optional[float] = Field(default=None, ge=-2.0, le=2.0)
|
||||
max_tokens: Optional[int] = Field(default=4096, ge=1)
|
||||
presence_penalty: Optional[float] = Field(default=None, ge=-2.0, le=2.0)
|
||||
temperature: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
# Note: top_k is currently not supported by the OpenAI client library,
|
||||
# so top_k is ignore right now.
|
||||
top_k: Optional[int] = Field(default=None, ge=0)
|
||||
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0)
|
||||
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
|
||||
seed: Optional[int] = Field(default=None)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
base_url: str = "https://api.together.xyz/v1",
|
||||
model: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(api_key=api_key, base_url=base_url, model=model, params=params, **kwargs)
|
||||
self.set_model_name(model)
|
||||
self._settings = {
|
||||
"max_tokens": params.max_tokens,
|
||||
"frequency_penalty": params.frequency_penalty,
|
||||
"presence_penalty": params.presence_penalty,
|
||||
"seed": params.seed,
|
||||
"temperature": params.temperature,
|
||||
"top_p": params.top_p,
|
||||
"extra": params.extra if isinstance(params.extra, dict) else {},
|
||||
}
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)
|
||||
|
||||
def create_client(self, api_key=None, base_url=None, **kwargs):
|
||||
logger.debug(f"Creating Together.ai client with api {base_url}")
|
||||
return AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
http_client=DefaultAsyncHttpxClient(
|
||||
limits=httpx.Limits(
|
||||
max_keepalive_connections=100, max_connections=1000, keepalive_expiry=None
|
||||
)
|
||||
),
|
||||
)
|
||||
return super().create_client(api_key, base_url, **kwargs)
|
||||
|
||||
@@ -39,9 +39,10 @@ class XTTSService(TTSService):
|
||||
language: Language,
|
||||
base_url: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
sample_rate: int = 24000,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"language": self.language_to_service_language(language),
|
||||
@@ -150,28 +151,30 @@ class XTTSService(TTSService):
|
||||
async for chunk in r.content.iter_chunked(1024):
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
# Append new chunk to the buffer
|
||||
# Append new chunk to the buffer.
|
||||
buffer.extend(chunk)
|
||||
|
||||
# Check if buffer has enough data for processing
|
||||
# Check if buffer has enough data for processing.
|
||||
while (
|
||||
len(buffer) >= 48000
|
||||
): # Assuming at least 0.5 seconds of audio data at 24000 Hz
|
||||
# Process the buffer up to a safe size for resampling
|
||||
# Process the buffer up to a safe size for resampling.
|
||||
process_data = buffer[:48000]
|
||||
# Remove processed data from buffer
|
||||
# Remove processed data from buffer.
|
||||
buffer = buffer[48000:]
|
||||
|
||||
# Resample the audio from 24000 Hz to 16000 Hz
|
||||
resampled_audio = resample_audio(bytes(process_data), 24000, 16000)
|
||||
# XTTS uses 24000 so we need to resample to our desired rate.
|
||||
resampled_audio = resample_audio(
|
||||
bytes(process_data), 24000, self._sample_rate
|
||||
)
|
||||
# Create the frame with the resampled audio
|
||||
frame = TTSAudioRawFrame(resampled_audio, 16000, 1)
|
||||
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
# Process any remaining data in the buffer
|
||||
# Process any remaining data in the buffer.
|
||||
if len(buffer) > 0:
|
||||
resampled_audio = resample_audio(bytes(buffer), 24000, 16000)
|
||||
frame = TTSAudioRawFrame(resampled_audio, 16000, 1)
|
||||
resampled_audio = resample_audio(bytes(buffer), 24000, self._sample_rate)
|
||||
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
0
src/pipecat/sync/__init__.py
Normal file
0
src/pipecat/sync/__init__.py
Normal file
17
src/pipecat/sync/base_notifier.py
Normal file
17
src/pipecat/sync/base_notifier.py
Normal file
@@ -0,0 +1,17 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseNotifier(ABC):
|
||||
@abstractmethod
|
||||
async def notify(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def wait(self):
|
||||
pass
|
||||
21
src/pipecat/sync/event_notifier.py
Normal file
21
src/pipecat/sync/event_notifier.py
Normal file
@@ -0,0 +1,21 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
|
||||
|
||||
class EventNotifier(BaseNotifier):
|
||||
def __init__(self):
|
||||
self._event = asyncio.Event()
|
||||
|
||||
async def notify(self):
|
||||
self._event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self._event.wait()
|
||||
self._event.clear()
|
||||
@@ -13,6 +13,7 @@ from typing import List
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
@@ -29,8 +30,6 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
@@ -69,71 +68,21 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
|
||||
# Indicates if the bot is currently speaking. This is useful when we
|
||||
# have an interruption since all the queued messages will be thrown
|
||||
# away and we would lose the TTSStoppedFrame.
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
|
||||
# Create sink frame task. This is the task that will actually write
|
||||
# audio or video frames. We write audio/video in a task so we can keep
|
||||
# generating frames upstream while, for example, the audio is playing.
|
||||
async def start(self, frame: StartFrame):
|
||||
self._create_output_tasks()
|
||||
self._create_sink_tasks()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Create camera output queue and task if needed.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._camera_out_task = self.get_event_loop().create_task(
|
||||
self._camera_out_task_handler()
|
||||
)
|
||||
# Create audio output queue and task if needed.
|
||||
if self._params.audio_out_enabled and self._params.audio_out_is_live:
|
||||
self._audio_out_queue = asyncio.Queue()
|
||||
self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Cancel and wait for the camera output task to finish.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
self._camera_out_task = None
|
||||
|
||||
# Cancel and wait for the audio output task to finish.
|
||||
if (
|
||||
self._audio_out_task
|
||||
and self._params.audio_out_enabled
|
||||
and self._params.audio_out_is_live
|
||||
):
|
||||
self._audio_out_task.cancel()
|
||||
await self._audio_out_task
|
||||
self._audio_out_task = None
|
||||
await self._cancel_output_tasks()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter if we cancel sink
|
||||
# tasks first or not.
|
||||
if self._sink_task:
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
self._sink_task = None
|
||||
|
||||
if self._sink_clock_task:
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
self._sink_clock_task = None
|
||||
|
||||
# Cancel and wait for the camera output task to finish.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
self._camera_out_task = None
|
||||
|
||||
# Cancel and wait for the audio output task to finish.
|
||||
if self._audio_out_task and (
|
||||
self._params.audio_out_enabled and self._params.audio_out_is_live
|
||||
):
|
||||
self._audio_out_task.cancel()
|
||||
await self._audio_out_task
|
||||
self._audio_out_task = None
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_output_tasks()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
pass
|
||||
@@ -209,19 +158,14 @@ class BaseOutputTransport(FrameProcessor):
|
||||
return
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Stop sink tasks.
|
||||
if self._sink_task:
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
# Stop sink clock tasks.
|
||||
if self._sink_clock_task:
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
# Create sink tasks.
|
||||
# Cancel sink and output tasks.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_output_tasks()
|
||||
# Create sink and output tasks.
|
||||
self._create_output_tasks()
|
||||
self._create_sink_tasks()
|
||||
# Let's send a bot stopped speaking if we have to.
|
||||
if self._bot_speaking:
|
||||
await self._bot_stopped_speaking()
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def _handle_audio(self, frame: OutputAudioRawFrame):
|
||||
if not self._params.audio_out_enabled:
|
||||
@@ -249,6 +193,18 @@ class BaseOutputTransport(FrameProcessor):
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
if not self._bot_speaking:
|
||||
logger.debug("Bot started speaking")
|
||||
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = True
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
if self._bot_speaking:
|
||||
logger.debug("Bot stopped speaking")
|
||||
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = False
|
||||
|
||||
#
|
||||
# Sink tasks
|
||||
#
|
||||
@@ -260,23 +216,27 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
|
||||
|
||||
async def _cancel_sink_tasks(self):
|
||||
# Stop sink tasks.
|
||||
if self._sink_task:
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
self._sink_task = None
|
||||
# Stop sink clock tasks.
|
||||
if self._sink_clock_task:
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
self._sink_clock_task = None
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self.push_frame(frame)
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
await self._audio_out_queue.put(frame)
|
||||
elif isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._bot_stopped_speaking()
|
||||
await self.push_frame(frame)
|
||||
# We will push EndFrame later.
|
||||
elif not isinstance(frame, EndFrame):
|
||||
await self.push_frame(frame)
|
||||
@@ -319,15 +279,32 @@ class BaseOutputTransport(FrameProcessor):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
logger.debug("Bot started speaking")
|
||||
self._bot_speaking = True
|
||||
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
#
|
||||
# Output tasks
|
||||
#
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
logger.debug("Bot stopped speaking")
|
||||
self._bot_speaking = False
|
||||
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
def _create_output_tasks(self):
|
||||
loop = self.get_event_loop()
|
||||
# Create camera output queue and task if needed.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
|
||||
# Create audio output queue and task if needed.
|
||||
if self._params.audio_out_enabled:
|
||||
self._audio_out_queue = asyncio.Queue()
|
||||
self._audio_out_task = loop.create_task(self._audio_out_task_handler())
|
||||
|
||||
async def _cancel_output_tasks(self):
|
||||
# Stop camera output task.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
self._camera_out_task = None
|
||||
# Stop audio output task.
|
||||
if self._audio_out_task and self._params.audio_out_enabled:
|
||||
self._audio_out_task.cancel()
|
||||
await self._audio_out_task
|
||||
self._audio_out_task = None
|
||||
|
||||
#
|
||||
# Camera out
|
||||
@@ -408,12 +385,31 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def _audio_out_task_handler(self):
|
||||
wait_time = (
|
||||
self._params.vad_analyzer.params.stop_secs
|
||||
if self._params.vad_analyzer
|
||||
else VAD_STOP_SECS
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
frame = await self._audio_out_queue.get()
|
||||
# If we don't have an audio frame for VAD stop secs we will
|
||||
# consider the bot is not speaking.
|
||||
frame = await asyncio.wait_for(self._audio_out_queue.get(), timeout=wait_time)
|
||||
|
||||
# Notify the bot started speaking upstream if necessary.
|
||||
await self._bot_started_speaking()
|
||||
|
||||
# Send audio.
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Notify the bot is speaking upstream.
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
# Push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
|
||||
@@ -30,7 +30,7 @@ class TransportParams(BaseModel):
|
||||
camera_out_color_format: str = "RGB"
|
||||
audio_out_enabled: bool = False
|
||||
audio_out_is_live: bool = False
|
||||
audio_out_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_in_enabled: bool = False
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import warnings
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Awaitable, Callable, Mapping, Optional
|
||||
@@ -20,7 +21,7 @@ from daily import (
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
from loguru import logger
|
||||
from pydantic.main import BaseModel
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
|
||||
from pipecat.frames.frames import (
|
||||
@@ -93,8 +94,8 @@ class DailyDialinSettings(BaseModel):
|
||||
|
||||
class DailyTranscriptionSettings(BaseModel):
|
||||
language: str = "en"
|
||||
tier: str = "nova"
|
||||
model: str = "2-conversationalai"
|
||||
tier: Optional[str] = None
|
||||
model: str = "nova-2-general"
|
||||
profanity_filter: bool = True
|
||||
redact: bool = False
|
||||
endpointing: bool = True
|
||||
@@ -102,6 +103,16 @@ class DailyTranscriptionSettings(BaseModel):
|
||||
includeRawResponse: bool = True
|
||||
extra: Mapping[str, Any] = {"interim_results": True}
|
||||
|
||||
@model_validator(mode="before")
|
||||
def check_deprecated_fields(cls, values):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
if "tier" in values:
|
||||
warnings.warn(
|
||||
"Field 'tier' is deprecated, use 'model' instead.", DeprecationWarning
|
||||
)
|
||||
return values
|
||||
|
||||
|
||||
class DailyParams(TransportParams):
|
||||
api_url: str = "https://api.daily.co/v1"
|
||||
@@ -127,6 +138,10 @@ class DailyCallbacks(BaseModel):
|
||||
on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]]
|
||||
on_participant_updated: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_transcription_message: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_recording_started: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_recording_stopped: Callable[[str], Awaitable[None]]
|
||||
on_recording_error: Callable[[str, str], Awaitable[None]]
|
||||
|
||||
|
||||
def completion_callback(future):
|
||||
@@ -176,7 +191,8 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
self._participant_id: str = ""
|
||||
self._video_renderers = {}
|
||||
self._transcription_renderers = {}
|
||||
self._transcription_ids = []
|
||||
self._transcription_status = None
|
||||
self._other_participant_has_joined = False
|
||||
|
||||
self._joined = False
|
||||
@@ -332,6 +348,7 @@ class DailyTransportClient(EventHandler):
|
||||
error = await future
|
||||
if error:
|
||||
logger.error(f"Unable to start transcription: {error}")
|
||||
return
|
||||
|
||||
async def _join(self):
|
||||
future = self._loop.create_future()
|
||||
@@ -440,25 +457,37 @@ class DailyTransportClient(EventHandler):
|
||||
def participant_counts(self):
|
||||
return self._client.participant_counts()
|
||||
|
||||
def start_dialout(self, settings):
|
||||
self._client.start_dialout(settings)
|
||||
async def start_dialout(self, settings):
|
||||
future = self._loop.create_future()
|
||||
self._client.start_dialout(settings, completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
def stop_dialout(self, participant_id):
|
||||
self._client.stop_dialout(participant_id)
|
||||
async def stop_dialout(self, participant_id):
|
||||
future = self._loop.create_future()
|
||||
self._client.stop_dialout(participant_id, completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
def start_recording(self, streaming_settings, stream_id, force_new):
|
||||
self._client.start_recording(streaming_settings, stream_id, force_new)
|
||||
async def start_recording(self, streaming_settings, stream_id, force_new):
|
||||
future = self._loop.create_future()
|
||||
self._client.start_recording(
|
||||
streaming_settings, stream_id, force_new, completion=completion_callback(future)
|
||||
)
|
||||
await future
|
||||
|
||||
def stop_recording(self, stream_id):
|
||||
self._client.stop_recording(stream_id)
|
||||
async def stop_recording(self, stream_id):
|
||||
future = self._loop.create_future()
|
||||
self._client.stop_recording(stream_id, completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
def capture_participant_transcription(self, participant_id: str, callback: Callable):
|
||||
async def capture_participant_transcription(self, participant_id: str):
|
||||
if not self._params.transcription_enabled:
|
||||
return
|
||||
|
||||
self._transcription_renderers[participant_id] = callback
|
||||
self._transcription_ids.append(participant_id)
|
||||
if self._joined and self._transcription_status:
|
||||
await self.update_transcription(self._transcription_ids)
|
||||
|
||||
def capture_participant_video(
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
callback: Callable,
|
||||
@@ -466,9 +495,12 @@ class DailyTransportClient(EventHandler):
|
||||
video_source: str = "camera",
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
# Only enable camera subscription on this participant
|
||||
self._client.update_subscriptions(
|
||||
participant_settings={participant_id: {"media": "subscribed"}}
|
||||
# Try to enable camera and screen subscription on this participant
|
||||
await self.update_subscriptions(
|
||||
# participant_settings={participant_id: {"media": "subscribed"}}
|
||||
participant_settings={
|
||||
participant_id: {"media": {"camera": "subscribed", "screenVideo": "subscribed"}}
|
||||
}
|
||||
)
|
||||
|
||||
self._video_renderers[participant_id] = callback
|
||||
@@ -480,6 +512,22 @@ class DailyTransportClient(EventHandler):
|
||||
color_format=color_format,
|
||||
)
|
||||
|
||||
async def update_transcription(self, participants=None, instance_id=None):
|
||||
future = self._loop.create_future()
|
||||
self._client.update_transcription(
|
||||
participants, instance_id, completion=completion_callback(future)
|
||||
)
|
||||
await future
|
||||
|
||||
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
|
||||
future = self._loop.create_future()
|
||||
self._client.update_subscriptions(
|
||||
participant_settings=participant_settings,
|
||||
profile_settings=profile_settings,
|
||||
completion=completion_callback(future),
|
||||
)
|
||||
await future
|
||||
|
||||
#
|
||||
#
|
||||
# Daily (EventHandler)
|
||||
@@ -528,23 +576,31 @@ class DailyTransportClient(EventHandler):
|
||||
def on_participant_updated(self, participant):
|
||||
self._call_async_callback(self._callbacks.on_participant_updated, participant)
|
||||
|
||||
def on_transcription_message(self, message: Mapping[str, Any]):
|
||||
participant_id = ""
|
||||
if "participantId" in message:
|
||||
participant_id = message["participantId"]
|
||||
def on_transcription_started(self, status):
|
||||
logger.debug(f"Transcription started: {status}")
|
||||
self._transcription_status = status
|
||||
self._call_async_callback(self.update_transcription, self._transcription_ids)
|
||||
|
||||
if participant_id in self._transcription_renderers:
|
||||
callback = self._transcription_renderers[participant_id]
|
||||
self._call_async_callback(callback, participant_id, message)
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
logger.debug("Transcription stopped")
|
||||
|
||||
def on_transcription_error(self, message):
|
||||
logger.error(f"Transcription error: {message}")
|
||||
|
||||
def on_transcription_started(self, status):
|
||||
logger.debug(f"Transcription started: {status}")
|
||||
def on_transcription_message(self, message):
|
||||
self._call_async_callback(self._callbacks.on_transcription_message, message)
|
||||
|
||||
def on_transcription_stopped(self, stopped_by, stopped_by_error):
|
||||
logger.debug("Transcription stopped")
|
||||
def on_recording_started(self, status):
|
||||
logger.debug(f"Recording started: {status}")
|
||||
self._call_async_callback(self._callbacks.on_recording_started, status)
|
||||
|
||||
def on_recording_stopped(self, stream_id):
|
||||
logger.debug(f"Recording stopped: {stream_id}")
|
||||
self._call_async_callback(self._callbacks.on_recording_stopped, stream_id)
|
||||
|
||||
def on_recording_error(self, stream_id, message):
|
||||
logger.error(f"Recording error for {stream_id}: {message}")
|
||||
self._call_async_callback(self._callbacks.on_recording_error, stream_id, message)
|
||||
|
||||
#
|
||||
# Daily (CallClient callbacks)
|
||||
@@ -561,8 +617,15 @@ class DailyTransportClient(EventHandler):
|
||||
)
|
||||
|
||||
def _call_async_callback(self, callback, *args):
|
||||
future = asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
|
||||
future.result()
|
||||
# Don't wait on the coroutine, otherwise if we call a `CallClient`
|
||||
# function and wait for its completion this will currently result in a
|
||||
# deadlock. This is because `_call_async_callback` is used inside
|
||||
# `CallClient` event handlers which are holding the GIL in
|
||||
# `daily-python`. So if the `callback` passed here makes a `CallClient`
|
||||
# call and waits for it to finish using completions (and a future) we
|
||||
# will deadlock because completions use event handlers (which are
|
||||
# holding the GIL).
|
||||
asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
|
||||
|
||||
|
||||
class DailyInputTransport(BaseInputTransport):
|
||||
@@ -631,7 +694,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, UserImageRequestFrame):
|
||||
self.request_participant_image(frame.user_id)
|
||||
await self.request_participant_image(frame.user_id)
|
||||
|
||||
#
|
||||
# Frames
|
||||
@@ -661,7 +724,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Camera in
|
||||
#
|
||||
|
||||
def capture_participant_video(
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
framerate: int = 30,
|
||||
@@ -674,11 +737,11 @@ class DailyInputTransport(BaseInputTransport):
|
||||
"render_next_frame": False,
|
||||
}
|
||||
|
||||
self._client.capture_participant_video(
|
||||
await self._client.capture_participant_video(
|
||||
participant_id, self._on_participant_video_frame, framerate, video_source, color_format
|
||||
)
|
||||
|
||||
def request_participant_image(self, participant_id: str):
|
||||
async def request_participant_image(self, participant_id: str):
|
||||
if participant_id in self._video_renderers:
|
||||
self._video_renderers[participant_id]["render_next_frame"] = True
|
||||
|
||||
@@ -711,37 +774,21 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._client = client
|
||||
|
||||
# Task to process outgoing messages.
|
||||
self._messages_task = None
|
||||
self._messages_queue = asyncio.Queue()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
# Start messages task
|
||||
self._messages_task = self.get_event_loop().create_task(self._messages_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Parent stop.
|
||||
await super().stop(frame)
|
||||
# Cancel messages task
|
||||
if self._messages_task:
|
||||
self._messages_task.cancel()
|
||||
await self._messages_task
|
||||
self._messages_task = None
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Parent stop.
|
||||
await super().cancel(frame)
|
||||
# Cancel messages task
|
||||
if self._messages_task:
|
||||
self._messages_task.cancel()
|
||||
await self._messages_task
|
||||
self._messages_task = None
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
|
||||
@@ -750,7 +797,7 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
await self._client.cleanup()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._messages_queue.put(frame)
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
@@ -758,17 +805,6 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
|
||||
await self._client.write_frame_to_camera(frame)
|
||||
|
||||
async def _messages_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
message = await self._messages_queue.get()
|
||||
await self._client.send_message(message)
|
||||
self._messages_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing message queue: {e}")
|
||||
|
||||
|
||||
class DailyTransport(BaseTransport):
|
||||
def __init__(
|
||||
@@ -799,6 +835,10 @@ class DailyTransport(BaseTransport):
|
||||
on_participant_joined=self._on_participant_joined,
|
||||
on_participant_left=self._on_participant_left,
|
||||
on_participant_updated=self._on_participant_updated,
|
||||
on_transcription_message=self._on_transcription_message,
|
||||
on_recording_started=self._on_recording_started,
|
||||
on_recording_stopped=self._on_recording_stopped,
|
||||
on_recording_error=self._on_recording_error,
|
||||
)
|
||||
self._params = params
|
||||
|
||||
@@ -824,6 +864,10 @@ class DailyTransport(BaseTransport):
|
||||
self._register_event_handler("on_participant_joined")
|
||||
self._register_event_handler("on_participant_left")
|
||||
self._register_event_handler("on_participant_updated")
|
||||
self._register_event_handler("on_transcription_message")
|
||||
self._register_event_handler("on_recording_started")
|
||||
self._register_event_handler("on_recording_stopped")
|
||||
self._register_event_handler("on_recording_error")
|
||||
|
||||
#
|
||||
# BaseTransport
|
||||
@@ -861,24 +905,22 @@ class DailyTransport(BaseTransport):
|
||||
def participant_counts(self):
|
||||
return self._client.participant_counts()
|
||||
|
||||
def start_dialout(self, settings=None):
|
||||
self._client.start_dialout(settings)
|
||||
async def start_dialout(self, settings=None):
|
||||
await self._client.start_dialout(settings)
|
||||
|
||||
def stop_dialout(self, participant_id):
|
||||
self._client.stop_dialout(participant_id)
|
||||
async def stop_dialout(self, participant_id):
|
||||
await self._client.stop_dialout(participant_id)
|
||||
|
||||
def start_recording(self, streaming_settings=None, stream_id=None, force_new=None):
|
||||
self._client.start_recording(streaming_settings, stream_id, force_new)
|
||||
async def start_recording(self, streaming_settings=None, stream_id=None, force_new=None):
|
||||
await self._client.start_recording(streaming_settings, stream_id, force_new)
|
||||
|
||||
def stop_recording(self, stream_id=None):
|
||||
self._client.stop_recording(stream_id)
|
||||
async def stop_recording(self, stream_id=None):
|
||||
await self._client.stop_recording(stream_id)
|
||||
|
||||
def capture_participant_transcription(self, participant_id: str):
|
||||
self._client.capture_participant_transcription(
|
||||
participant_id, self._on_transcription_message
|
||||
)
|
||||
async def capture_participant_transcription(self, participant_id: str):
|
||||
await self._client.capture_participant_transcription(participant_id)
|
||||
|
||||
def capture_participant_video(
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
framerate: int = 30,
|
||||
@@ -886,10 +928,15 @@ class DailyTransport(BaseTransport):
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
if self._input:
|
||||
self._input.capture_participant_video(
|
||||
await self._input.capture_participant_video(
|
||||
participant_id, framerate, video_source, color_format
|
||||
)
|
||||
|
||||
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
|
||||
await self._client.update_subscriptions(
|
||||
participant_settings=participant_settings, profile_settings=profile_settings
|
||||
)
|
||||
|
||||
async def _on_joined(self, data):
|
||||
await self._call_event_handler("on_joined", data)
|
||||
|
||||
@@ -927,7 +974,9 @@ class DailyTransport(BaseTransport):
|
||||
url = f"{self._params.api_url}/dialin/pinlessCallUpdate"
|
||||
|
||||
try:
|
||||
async with session.post(url, headers=headers, json=data, timeout=10) as r:
|
||||
async with session.post(
|
||||
url, headers=headers, json=data, timeout=aiohttp.ClientTimeout(total=10)
|
||||
) as r:
|
||||
if r.status != 200:
|
||||
text = await r.text()
|
||||
logger.error(
|
||||
@@ -973,7 +1022,13 @@ class DailyTransport(BaseTransport):
|
||||
async def _on_first_participant_joined(self, participant):
|
||||
await self._call_event_handler("on_first_participant_joined", participant)
|
||||
|
||||
async def _on_transcription_message(self, participant_id, message):
|
||||
async def _on_transcription_message(self, message):
|
||||
participant_id = ""
|
||||
if "participantId" in message:
|
||||
participant_id = message["participantId"]
|
||||
if not participant_id:
|
||||
return
|
||||
|
||||
text = message["text"]
|
||||
timestamp = message["timestamp"]
|
||||
is_final = message["rawResponse"]["is_final"]
|
||||
@@ -990,3 +1045,12 @@ class DailyTransport(BaseTransport):
|
||||
|
||||
if self._input:
|
||||
await self._input.push_transcription_frame(frame)
|
||||
|
||||
async def _on_recording_started(self, status):
|
||||
await self._call_event_handler("on_recording_started", status)
|
||||
|
||||
async def _on_recording_stopped(self, stream_id):
|
||||
await self._call_event_handler("on_recording_stopped", stream_id)
|
||||
|
||||
async def _on_recording_error(self, stream_id, message):
|
||||
await self._call_event_handler("on_recording_error", stream_id, message)
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import Any, Awaitable, Callable, List
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
|
||||
@@ -4,8 +4,13 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
import warnings
|
||||
|
||||
logger.warning("DEPRECATED: Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead.")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead", DeprecationWarning
|
||||
)
|
||||
|
||||
from ..audio.vad.silero import SileroVAD, SileroVADAnalyzer
|
||||
from ..audio.vad.silero import SileroVADAnalyzer
|
||||
from ..processors.audio.vad.silero import SileroVAD
|
||||
|
||||
@@ -4,8 +4,12 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
import warnings
|
||||
|
||||
logger.warning("DEPRECATED: Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead.")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Package `pipecat.vad` is deprecated, use `pipecat.audio.vad` instead", DeprecationWarning
|
||||
)
|
||||
|
||||
from ..audio.vad.vad_analyzer import VADAnalyzer, VADParams, VADState
|
||||
|
||||
@@ -22,7 +22,7 @@ pydantic~=2.8.2
|
||||
pyloudnorm~=0.1.1
|
||||
pyht~=0.1.4
|
||||
python-dotenv~=1.0.1
|
||||
scipy~=1.14.1
|
||||
resampy~=0.4.3
|
||||
silero-vad~=5.1
|
||||
together~=1.2.7
|
||||
transformers~=4.44.0
|
||||
|
||||
Reference in New Issue
Block a user