Compare commits

...

93 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
9cd7c82e77 testing pushing a frame from function call start hook 2024-09-30 14:52:18 -07:00
Kwindla Hultman Kramer
43161c816e get rid of some debug log lines used during development 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
6644c06af1 throw error if the llm tries to call a function that's not registered 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
ed47212e07 handle openai multiple function calls 2024-09-30 14:48:40 -07:00
JeevanReddy
db9cb74364 openai can give multiple tool calls, current implementation assumes only one function call at a time. Fixed this to handle multiple function calls. 2024-09-30 14:47:31 -07:00
Aleix Conchillo Flaqué
f64902eb25 pipeline(task): since everything is async tasks should wait for EndFrame 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
e115a274d6 tests: fix langchanin tests 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
00239c2fd4 syncparallelpipeline: fix now that all frames are asynchronous 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
c0f9ad19fe all frame processors are asynchrnous
In this commit we make all frame processors asynchronous, that is, they have an
internal queue and they push frames using a task from that queue.
2024-09-30 13:17:50 -07:00
Mark Backman
46ac76701e Merge pull request #517 from pipecat-ai/mb/update-settings-frame
Consolidate update frames classes into a single UpdateSettingsFrame class
2024-09-30 12:56:45 -04:00
Mark Backman
1f77863aef Code review feedback 2024-09-30 12:50:40 -04:00
Mark Backman
d7555609fd Add TTS update settings options 2024-09-30 12:50:40 -04:00
Mark Backman
7fe118ce63 Align use of language param across TTS services 2024-09-30 12:50:40 -04:00
Mark Backman
44a349386c Consolidate update frames classes into a single UpdateSettingsFrame class 2024-09-30 12:50:39 -04:00
Mark Backman
97cba92fa5 Merge pull request #516 from pipecat-ai/mb/google-tts
Add Google TTS
2024-09-30 12:25:16 -04:00
Aleix Conchillo Flaqué
d9b16d4f73 services: import cosmetics 2024-09-27 13:32:27 -07:00
Aleix Conchillo Flaqué
50b6580fbb livekit: add license notice 2024-09-27 13:28:33 -07:00
Mark Backman
e7548f9494 Code review feedback 2024-09-27 08:02:44 -04:00
Mark Backman
830d2df671 Add Google TTS 2024-09-27 07:36:20 -04:00
Aleix Conchillo Flaqué
13b50a07db Merge pull request #515 from pipecat-ai/aleix/rtvi-frame-processors
RTVI frame processors
2024-09-27 00:48:09 -07:00
Aleix Conchillo Flaqué
4501dca133 Merge pull request #467 from joachimchauvet/main
Add LiveKit audio transport
2024-09-26 22:58:25 -07:00
Aleix Conchillo Flaqué
2c8e566507 rtvi: update version to 0.2 2024-09-26 22:42:36 -07:00
Aleix Conchillo Flaqué
6e8a202107 rtvi: fix handling transport messages 2024-09-26 22:42:19 -07:00
Aleix Conchillo Flaqué
2a05cd35b0 rtvi: add multiple RTVI frame processors 2024-09-26 22:42:08 -07:00
Mark Backman
55a70cde8f Merge pull request #514 from pipecat-ai/mb/aws-polly-tts
Add AWS Polly TTS support
2024-09-26 22:20:13 -04:00
Mark Backman
706c00d897 Code review feedback 2024-09-26 22:13:37 -04:00
Aleix Conchillo Flaqué
d323ea9e95 async_generator: keep pushing frames downstream 2024-09-26 16:44:49 -07:00
Aleix Conchillo Flaqué
b8ece84c6e services: super should be super() 2024-09-26 10:39:26 -07:00
Mark Backman
a018112a13 Merge pull request #510 from pipecat-ai/mb/deepgram-tts-http
Improve usability of Deepgram TTS: use Deepgram client, remove aiohttp
2024-09-26 13:38:42 -04:00
Mark Backman
d3a477902b Add changelog entry 2024-09-26 13:35:59 -04:00
Mark Backman
298b151486 Add setter methods 2024-09-26 13:35:59 -04:00
Mark Backman
6a6ea251ae Add AWS Polly TTS support 2024-09-26 13:35:59 -04:00
Aleix Conchillo Flaqué
c7c709a0a7 github: cache venv when running tests 2024-09-26 10:32:22 -07:00
Aleix Conchillo Flaqué
6ac57b4854 Merge pull request #494 from badbye/full-width-punctuations
add full-width punctuations as end of the sentence
2024-09-26 10:17:10 -07:00
Aleix Conchillo Flaqué
f5e0b946c7 services(cartesia): fix string formatting 2024-09-26 09:08:37 -07:00
Mark Backman
b1818cc370 Merge pull request #435 from golbin/main
Add speed and emotion options for Cartesia.
2024-09-26 07:14:59 -04:00
Jin Kim
d05717a1bd Apply Ruff formater 2024-09-26 19:52:25 +09:00
Aleix Conchillo Flaqué
d11daee31a Merge pull request #509 from pipecat-ai/aleix/frameprocessor-event-handlers
frame processor event handlers
2024-09-25 19:50:30 -07:00
Mark Backman
73da8c1910 Improve usability of Deepgram TTS: use Deepgram client, remove aiohttp 2024-09-25 22:43:10 -04:00
Aleix Conchillo Flaqué
f06aa300d0 rtvi: add on_bot_ready event 2024-09-25 16:52:18 -07:00
Aleix Conchillo Flaqué
c4e94e280e processors: add support for event handlers 2024-09-25 16:35:33 -07:00
Kwindla Hultman Kramer
8f2941c575 Merge pull request #492 from pipecat-ai/khk/flush-more-audio
add calls to flush_audio for say() and rtvi action
2024-09-25 12:35:50 -07:00
joachimchauvet
447baad5c3 update send_metrics() to support changes introduced in #474 2024-09-25 21:38:55 +03:00
Mark Backman
2703813e8a Merge pull request #496 from pipecat-ai/mb/azure-tts-inputs
Add Azure TTS input params
2024-09-25 14:38:01 -04:00
Mark Backman
521e152150 Merge pull request #495 from pipecat-ai/mb/elevenlabs-input-lang
Add language_code support for ElevenLabs TTS
2024-09-25 14:37:44 -04:00
Kwindla Hultman Kramer
3d43ad0f4d actually save the file 2024-09-25 10:59:00 -07:00
Kwindla Hultman Kramer
3621fceae2 fixes as noted by aleix 2024-09-25 09:19:58 -07:00
Aleix Conchillo Flaqué
e123f33c03 Merge pull request #506 from pipecat-ai/aleix/async-generator-processor
processors: add AsyncGeneratorProcessor
2024-09-25 00:04:09 -07:00
Aleix Conchillo Flaqué
b8713666c2 processors: add AsyncGeneratorProcessor 2024-09-25 00:01:04 -07:00
Aleix Conchillo Flaqué
cf0ab85e2c Merge pull request #505 from pipecat-ai/aleix/init-task-variables
initialize task variables and add minor description
2024-09-24 23:59:38 -07:00
Aleix Conchillo Flaqué
8502c7c801 Merge pull request #504 from pipecat-ai/aleix/rtvi-handle-frame
rtvi: add RTVIProcessor.handle_message()
2024-09-24 23:59:26 -07:00
Aleix Conchillo Flaqué
e89814dc6b Merge pull request #503 from pipecat-ai/aleix/end-cancel-task-frames
frames: add EndTaskFrame and CancelTaskFrame
2024-09-24 23:59:10 -07:00
Aleix Conchillo Flaqué
9461bacf0d pyproject: update fastapi to 0.115.0 2024-09-24 19:24:37 -07:00
Aleix Conchillo Flaqué
e276dcbab7 initialize task variables and add minor description 2024-09-24 19:19:00 -07:00
Aleix Conchillo Flaqué
1a3de0e819 rtvi: add RTVIProcessor.handle_message() 2024-09-24 19:12:06 -07:00
Aleix Conchillo Flaqué
ee3786fe15 frames: add EndTaskFrame and CancelTaskFrame 2024-09-24 19:10:22 -07:00
Aleix Conchillo Flaqué
31b5667cee frames: log text with [] so we can distinguish spaces better 2024-09-24 13:10:40 -07:00
Aleix Conchillo Flaqué
a483f1a083 rtvi: handle all actions from the action task 2024-09-24 10:48:15 -07:00
Aleix Conchillo Flaqué
2ecec1c9f8 Merge pull request #500 from pipecat-ai/aleix/rtvi-action-frames-task
RTVI action frames task
2024-09-24 10:13:43 -07:00
Aleix Conchillo Flaqué
08ac311971 rtvi: use task to process incoming action frames 2024-09-24 09:36:53 -07:00
Aleix Conchillo Flaqué
cb49b6a0d6 rtvi: add llm-text and tts-text server messages 2024-09-24 09:36:43 -07:00
Aleix Conchillo Flaqué
016da177db Merge pull request #499 from mercuryyy/main
Fix syntax error in deepgram.py
2024-09-24 09:10:05 -07:00
joachimchauvet
ec5998bc36 remove _internal_push_frame from LiveKitInputTransport 2024-09-24 14:54:37 +03:00
mercuryyy
b1e17ee347 Fix syntax error in deepgram.py 2024-09-24 07:45:29 -04:00
joachimchauvet
b6e1d6e6ae format with ruff 2024-09-24 10:21:02 +03:00
joachimchauvet
fa609f1afc adjust output sample rate and create user token 2024-09-24 10:16:54 +03:00
joachimchauvet
470b5eafe7 move tenacity imports inside try block 2024-09-24 10:16:54 +03:00
joachimchauvet
2e5b0c1d6b add tenacity dependency 2024-09-24 10:16:54 +03:00
joachimchauvet
a9390d96a1 add LiveKit audio transport 2024-09-24 10:16:54 +03:00
Mark Backman
8ee9621d66 Add setter functions 2024-09-23 21:12:01 -04:00
Jin Kim
49f2123893 Apply and Fix upstream changes for Cartesia 2024-09-24 07:59:26 +09:00
Jin Kim
cf72129852 Merge remote-tracking branch 'upstream/main' 2024-09-24 07:18:22 +09:00
Mark Backman
8edee8155d Add input params to Azure TTS 2024-09-23 17:52:23 -04:00
chadbailey59
c262b272fa Added RTVIActionFrame (#464)
* added RTVIActionFrame

* server-sent events

* reverted log changes

* fixup
2024-09-23 14:51:17 -05:00
Aleix Conchillo Flaqué
9ef9c1c58a Merge pull request #497 from pipecat-ai/aleix/ruff-formater
introduce Ruff formatting
2024-09-23 10:42:54 -07:00
Aleix Conchillo Flaqué
c7ff79a652 processors: fix formatting string 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
da81df5284 github: install dev-requirements when running tests 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
a4420dc88b README: add vscode and emacs ruff instructions 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
eeb8338dce introduce Ruff formatting 2024-09-23 09:53:37 -07:00
Cyril S.
dfa4ac81fd Implement Sentry instrumentation for performance and error tracking (#470)
* feat: Add Sentry support in FrameProcessor

This update add optional Sentry integration for performance tracking and error monitoring.

Key changes include:

- Add conditional Sentry import and initialization check
- Implement Sentry spans in FrameProcessorMetrics to measure TTFB (Time To First Byte) and processing time when Sentry is available
- Maintain existing metrics functionality with MetricsFrame regardless of Sentry availability

* feat: Enable metrics in DeepgramSTTService for Sentry

This commit enhances the DeepgramSTTService class to enable metrics generation for use with Sentry.

Key changes include:

1. Enable general metrics generation:
   - Implement `can_generate_metrics` method, returning True when VAD is enabled
   - This allows metrics to be collected and used by both Sentry and the metrics system in frame_processor.py

2. Integrate Sentry-compatible performance tracking:
   - Add start_ttfb_metrics and start_processing_metrics calls in the VAD speech detection handler
   - Implement stop_ttfb_metrics call when receiving transcripts
   - Add stop_processing_metrics for final transcripts

3. Enhance VAD support for metrics:
   - Add `vad_enabled` property to check VAD event availability
   - Implement VAD-based speech detection handler for precise metric timing

These changes enable detailed performance tracking via both Sentry and the general metrics system when VAD is active. This allows for better monitoring and analysis of the speech-to-text process, providing valuable insights through Sentry and any other metrics consumers in the pipeline.

* Update frame_processor.py

* Refactor to support flexible metrics implementation

- Modified the __init__ method to accept a metrics parameter that is either FrameProcessorMetrics or one of its subclasses
- Updated the metrics initialization to create an instance with the processor's name
- Moved all FrameProcessorMetrics-related logic to a new processors\metrics\base.py file

* Implement flexible metrics system with Sentry integration

1. Created a new metrics module in processors/metrics/

2. Implemented FrameProcessorMetrics base class in base.py:

3. Implemented SentryMetrics class in sentry.py:
   - Inherits from FrameProcessorMetrics
   - Integrates with Sentry SDK for advanced metrics tracking
   - Implements Sentry-specific span creation and management for TTFB and processing metrics
   - Handles cases where Sentry is not available or initialized
2024-09-23 08:44:14 -07:00
Lewis Wolfgang
ea16dca8aa Merge pull request #469 from pipecat-ai/lewis/remove_torch_dependency
Remove torch dependency for using silero_vad
2024-09-23 09:59:40 -04:00
Mark Backman
306632b29a Add language_code support for ElevenLabs TTS 2024-09-23 09:01:02 -04:00
duyalei
4533ed014f add full-width punctuations as end of the sentence 2024-09-23 16:35:00 +08:00
Jin Kim
68cc4186ad Merge remote-tracking branch 'upstream/main' 2024-09-23 16:34:31 +09:00
Mark Backman
9a4e749c7c Merge pull request #491 from pipecat-ai/mb/elevenlabs-inputs
Add voice_settings and optimize_streaming_latency to ElevenLabs
2024-09-22 21:54:21 -04:00
Mark Backman
55c645c614 Add voice_settings and optimize_streaming_latency to ElevenLabs 2024-09-22 13:58:50 -04:00
Mark Backman
a1024bb365 Merge pull request #490 from pipecat-ai/mb/llm-rtvi-service-option
Add control frames for LLM param updates
2024-09-21 20:10:17 -04:00
Mark Backman
dfc82c3ba4 Merge pull request #486 from pipecat-ai/mb/llm-extra-params
Add extra input param to LLMs
2024-09-21 18:25:47 -04:00
Lewis Wolfgang
71202e3cd5 Remove torch dependency for using silero_vad 2024-09-17 16:48:52 -04:00
Jin Kim
75008d8f11 Add speed and emotion setting method to Cartesia TTS service 2024-09-18 00:51:45 +09:00
Jin Kim
2da0ecbe3c Revert "model_id" as a main argument 2024-09-18 00:38:12 +09:00
Jin Kim
c7f814b2dc Merge remote-tracking branch 'upstream/main' 2024-09-18 00:33:29 +09:00
Jin Kim
fa0deededa Add voice options and make to use InputParams for Cartesia. 2024-09-09 10:53:23 +09:00
166 changed files with 5517 additions and 3071 deletions

View File

@@ -1,4 +1,4 @@
name: lint
name: format
on:
workflow_dispatch:
@@ -12,12 +12,12 @@ on:
- "docs/**"
concurrency:
group: build-lint-${{ github.event.pull_request.number || github.ref }}
group: build-format-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
autopep8:
name: "Formatting lints"
ruff-format:
name: "Formatting checker"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
@@ -25,7 +25,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: "3.10"
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -34,11 +34,8 @@ jobs:
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt
- name: autopep8
id: autopep8
- name: Ruff formatter
id: ruff
run: |
source .venv/bin/activate
autopep8 --max-line-length 100 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1
ruff format --config line-length=100 --diff --exclude "*_pb2.py"

View File

@@ -27,6 +27,13 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Cache virtual environment
uses: actions/cache@v3
with:
# We are hashing dev-requirements.txt and test-requirements.txt which
# contain all dependencies needed to run the tests.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
path: .venv
- name: Install system packages
id: install_system_packages
run: |
@@ -38,7 +45,7 @@ jobs:
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r test-requirements.txt
pip install -r dev-requirements.txt -r test-requirements.txt
- name: Test with pytest
run: |
source .venv/bin/activate

View File

@@ -9,6 +9,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added Google TTS service and corresponding foundational example `07n-interruptible-google.py`
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
- Added InputParams to Azure TTS service.
- All `FrameProcessors` can now register event handlers.
```
tts = SomeTTSService(...)
@tts.event_handler("on_connected"):
async def on_connected(processor):
...
```
- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
- Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed)
for OpenAI, Anthropic, and Together AI services along with corresponding
setter functions.
@@ -24,15 +48,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
frames. To achieve that, each frame processor should only output frames from a
single task.
In this version we introduce synchronous and asynchronous frame
processors. The synchronous processors push output frames from the same task
that they receive input frames, and therefore only pushing frames from one
task. Asynchronous frame processors can have internal tasks to perform things
asynchronously (e.g. receiving data from a websocket) but they also have a
single task where they push frames from.
By default, frame processors are synchronous. To change a frame processor to
asynchronous you only need to pass `sync=False` to the base class constructor.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
@@ -44,9 +63,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added `CartesiaHttpTTSService`. This is a synchronous frame processor
(i.e. given an input text frame it will wait for the whole output before
returning).
- Added `CartesiaHttpTTSService`.
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
@@ -69,6 +86,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated individual update settings frame classes into a single UpdateSettingsFrame
class for STT, LLM, and TTS.
- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
@@ -83,8 +103,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by ensuring all the processors in each of the
internal pipelines are synchronous.
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be

View File

@@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples
@@ -110,7 +110,6 @@ python app.py
Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
WebSockets are fine for server-to-server communication or for initial development. But for production use, youll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
@@ -131,7 +130,6 @@ pip install pipecat-ai[silero]
The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console.
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -170,22 +168,24 @@ pytest --doctest-modules --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline
## Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting.
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [py-autopep8](https://codeberg.org/ideasman42/emacs-py-autopep8) package and configure `autopep8` arguments:
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
```elisp
(use-package py-autopep8
(use-package lazy-ruff
:ensure t
:defer t
:hook ((python-mode . py-autopep8-mode))
:hook ((python-mode . lazy-ruff-mode))
:config
(setq py-autopep8-options '("-a" "-a", "--max-line-length=100")))
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
```
`autopep8` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
```elisp
(use-package pyvenv-auto
@@ -198,18 +198,14 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [p
### Visual Studio Code
Install the
[autopep8](https://marketplace.visualstudio.com/items?itemName=ms-python.autopep8) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `autopep8` arguments:
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
```json
"[python]": {
"editor.defaultFormatter": "ms-python.autopep8",
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true
},
"autopep8.args": [
"-a",
"-a",
"--max-line-length=100"
],
"ruff.format.args": ["--config", "line-length=100"]
```
## Getting help

View File

@@ -1,8 +1,8 @@
autopep8~=2.3.1
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.376
pytest~=8.3.2
ruff~=0.6.7
setuptools~=72.2.0
setuptools_scm~=8.1.0

View File

@@ -1,6 +1,11 @@
# Anthropic
ANTHROPIC_API_KEY=...
# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...
# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...

View File

@@ -6,7 +6,10 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -16,6 +19,7 @@ from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -39,7 +43,7 @@ async def main(room_url: str, token: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -47,9 +51,7 @@ async def main(room_url: str, token: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -61,14 +63,16 @@ async def main(room_url: str, token: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -16,9 +16,14 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -26,37 +31,37 @@ load_dotenv(override=True)
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -64,7 +69,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
allow_headers=["*"],
)
# ----------------- Main ----------------- #
@@ -73,13 +78,15 @@ app.add_middleware(
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
@@ -88,31 +95,28 @@ async def spawn_fly_machine(room_url: str, token: str):
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
vm_id = data["id"]
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
@@ -134,29 +138,23 @@ async def start_bot(request: Request) -> JSONResponse:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
@@ -167,24 +165,26 @@ async def start_bot(request: Request) -> JSONResponse:
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
if __name__ == "__main__":
# Check environment variables
@@ -193,23 +193,19 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload code on change"
)
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -6,11 +6,11 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyDialinSettings
@@ -18,6 +18,7 @@ from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -31,10 +32,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
)
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
@@ -50,7 +48,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -58,10 +56,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -73,14 +68,16 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -7,7 +7,6 @@ provisioning a room and starting a Pipecat bot in response.
Refer to README for more information.
"""
import aiohttp
import os
import argparse
@@ -25,17 +24,18 @@ from pipecat.transports.services.helpers.daily_rest import (
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams)
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']
REQUIRED_ENV_VARS = ["OPENAI_API_KEY", "DAILY_API_KEY", "ELEVENLABS_API_KEY", "ELEVENLABS_VOICE_ID"]
daily_helpers = {}
@@ -47,12 +47,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -60,7 +61,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
allow_headers=["*"],
)
"""
@@ -80,10 +81,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
properties=DailyRoomProperties(
# Note: these are the default values, except for the display name
sip=DailyRoomSipParams(
display_name="dialin-user",
video=False,
sip_mode="dial-in",
num_endpoints=1
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
)
)
)
@@ -97,8 +95,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
@@ -106,8 +103,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get room or token token")
raise HTTPException(status_code=500, detail=f"Failed to get room or token token")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
@@ -120,14 +116,10 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
try:
subprocess.Popen(
[bot_proc],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return room
@@ -150,11 +142,10 @@ async def twilio_start_bot(request: Request):
pass
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
callId = data.get('CallSid')
callId = data.get("CallSid")
if not callId:
raise HTTPException(
status_code=500, detail="Missing 'CallSid' in request")
raise HTTPException(status_code=500, detail="Missing 'CallSid' in request")
print("CallId: %s" % callId)
@@ -170,7 +161,8 @@ async def twilio_start_bot(request: Request):
# http://com.twilio.music.classical.s3.amazonaws.com/BusyStrings.mp3
resp = VoiceResponse()
resp.play(
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10)
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10
)
return str(resp)
@@ -192,18 +184,14 @@ async def daily_start_bot(request: Request) -> JSONResponse:
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
except Exception:
raise HTTPException(
status_code=500,
detail="Missing properties 'callId' or 'callDomain'")
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({
"room_url": room.url,
"sipUri": room.config.sip_endpoint
})
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
# ----------------- Main ----------------- #
@@ -215,24 +203,18 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -6,11 +6,11 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -21,14 +21,15 @@ from twilio.rest import Client
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID')
twilio_auth_token = os.getenv('TWILIO_AUTH_TOKEN')
twilio_account_sid = os.getenv("TWILIO_ACCOUNT_SID")
twilio_auth_token = os.getenv("TWILIO_AUTH_TOKEN")
twilioclient = Client(twilio_account_sid, twilio_auth_token)
daily_api_key = os.getenv("DAILY_API_KEY", "")
@@ -51,7 +52,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -59,10 +60,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -74,14 +72,16 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -103,7 +103,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
twiml=f"<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>"
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")

View File

@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -32,7 +33,8 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -47,10 +49,11 @@ async def main():
# participant joins.
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
participant_name = participant["info"]["userName"] or ""
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,6 +20,7 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)

View File

@@ -0,0 +1,108 @@
import argparse
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from livekit import api # pip install livekit-api
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
async def configure_livekit():
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token(room_name, "Say One Thing", api_key, api_secret)
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name)
async def main():
async with aiohttp.ClientSession() as session:
(url, token, room_name) = await configure_livekit()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TextFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -22,6 +22,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,25 +34,22 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Say One Thing From an LLM",
DailyParams(audio_out_enabled=True))
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}]
}
]
runner = PipelineRunner()

View File

@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -35,17 +36,11 @@ async def main():
room_url,
None,
"Show a still frame image",
DailyParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
DailyParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)

View File

@@ -22,6 +22,7 @@ from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -35,15 +36,11 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)

View File

@@ -28,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -58,8 +59,7 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [{"role": "system",
"content": "tell the user a joke about llamas"}]
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM
# output to audio frames. This task will run in parallel with generating
@@ -77,8 +77,7 @@ async def main():
]
)
merge_pipeline = SequentialMergePipeline(
[simple_tts_pipeline, llm_pipeline])
merge_pipeline = SequentialMergePipeline([simple_tts_pipeline, llm_pipeline])
await asyncio.gather(
transport.run(merge_pipeline),

View File

@@ -16,7 +16,7 @@ from pipecat.frames.frames import (
Frame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -34,6 +34,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -81,23 +82,19 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
camera_out_height=1024,
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -110,17 +107,21 @@ async def main():
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to be
# synchronous (which is the default for most processors).
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen] # Generate image
),
transport.output() # Transport output
])
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen], # Generate image
),
transport.output(), # Transport output
]
)
frames = []
for month in [

View File

@@ -17,7 +17,8 @@ from pipecat.frames.frames import (
TTSAudioRawFrame,
URLImageRawFrame,
LLMMessagesFrame,
TextFrame)
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
@@ -48,7 +49,12 @@ async def main():
runner = PipelineRunner()
async def get_month_data(month):
messages = [{"role": "system", "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", }]
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
class ImageDescription(FrameProcessor):
def __init__(self):
@@ -74,7 +80,8 @@ async def main():
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels)
bytes(self.audio), frame.sample_rate, frame.num_channels
)
class ImageGrabber(FrameProcessor):
def __init__(self):
@@ -87,9 +94,7 @@ async def main():
if isinstance(frame, URLImageRawFrame):
self.frame = frame
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -97,11 +102,10 @@ async def main():
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"))
key=os.getenv("FAL_KEY"),
)
sentence_aggregator = SentenceAggregator()
@@ -117,17 +121,21 @@ async def main():
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires all processors in it to
# be synchronous (which is the default for most processors).
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber] # Generate and storeimage for the given sentence
)
])
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
@@ -148,7 +156,9 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
camera_out_height=1024,
),
)
pipeline = Pipeline([transport.output()])

View File

@@ -10,7 +10,12 @@ import os
import sys
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData, LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -29,6 +34,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -48,7 +54,8 @@ class MetricsLogger(FrameProcessor):
print(
f"!!! MetricsFrame: {frame}, tokens: {
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}")
tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
@@ -66,8 +73,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -75,10 +82,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ml = MetricsLogger()
@@ -91,15 +95,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
ml,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
ml,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline)
@@ -107,8 +113,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -31,6 +31,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -52,16 +53,21 @@ class ImageSyncAggregator(FrameProcessor):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format)
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format,
)
)
await self.push_frame(frame)
await self.push_frame(OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format))
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format,
)
)
else:
await self.push_frame(frame)
@@ -82,7 +88,7 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
),
)
tts = CartesiaHttpTTSService(
@@ -90,9 +96,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -109,21 +113,23 @@ async def main():
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline([
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out
])
pipeline = Pipeline(
[
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
participant_name = participant["info"]["userName"] or ""
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -52,9 +55,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -66,28 +67,32 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -53,8 +56,8 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-opus-20240229")
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229"
)
# todo: think more about how to handle system prompts in a more general way. OpenAI,
# Google, and Anthropic all have slightly different approaches to providing a system
@@ -69,14 +72,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -15,7 +15,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -32,6 +34,7 @@ from loguru import logger
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -70,19 +73,22 @@ async def main():
prompt = ChatPromptTemplate.from_messages(
[
("system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
(
"system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
]
)
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input")
input_messages_key="input",
)
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
@@ -90,12 +96,12 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
@@ -109,11 +115,7 @@ async def main():
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [(
{
"content": "Please briefly introduce yourself to the user."
}
)]
messages = [({"content": "Please briefly introduce yourself to the user."})]
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,26 +5,27 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,21 +44,15 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,15 +64,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -85,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,26 +5,27 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +44,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
@@ -52,9 +53,7 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -66,28 +65,32 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.playht import PlayHTTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +47,8 @@ async def main():
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = PlayHTTTSService(
@@ -54,9 +57,7 @@ async def main():
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -68,14 +69,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -83,8 +86,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.azure import AzureLLMService, AzureSTTService, AzureTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +48,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
),
)
stt = AzureSTTService(
@@ -74,15 +77,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -90,8 +95,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.openai import OpenAITTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,18 +47,13 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="alloy")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,14 +65,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -82,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -28,6 +28,7 @@ from loguru import logger
import time
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -46,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -60,9 +61,7 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
model="gpt-4o",
tags={
"conversation_id": f"pipecat-{timestamp}"
}
tags={"conversation_id": f"pipecat-{timestamp}"},
)
messages = [
@@ -74,14 +73,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@@ -89,8 +90,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,19 +48,17 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
),
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000"
base_url="http://localhost:8000",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,14 +70,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -84,8 +87,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +48,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
),
)
stt = GladiaSTTService(
@@ -57,9 +60,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -71,15 +72,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -87,8 +90,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.lmnt import LmntTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,18 +47,13 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = LmntTTSService(
api_key=os.getenv("LMNT_API_KEY"),
voice_id="morgan"
)
tts = LmntTTSService(api_key=os.getenv("LMNT_API_KEY"), voice_id="morgan")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,14 +65,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -82,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -62,8 +65,8 @@ async def main():
extra={
"frequency_penalty": 2.0,
"presence_penalty": 0.0,
}
)
},
),
)
messages = [
@@ -76,14 +79,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -0,0 +1,102 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.aws import AWSTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AWSTTSService(
api_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="Amy",
params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,100 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
credentials=os.getenv("GOOGLE_CREDENTIALS"),
voice_id="en-US-Neural2-J",
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -15,6 +15,7 @@ from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesF
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
@@ -53,9 +54,7 @@ async def main():
voice_id="jBpfuIE2acCO8z3wKNLl",
)
dalle = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="1024x1024"
),
params=FalImageGenService.InputParams(image_size="1024x1024"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -75,13 +74,11 @@ async def main():
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
"""This function streams text from the LLM and uses the TTS service to convert
that text to speech as it's received. """
that text to speech as it's received."""
source_queue = asyncio.Queue()
sink_queue = asyncio.Queue()
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline(
[llm, sentence_aggregator, tts1], source_queue, sink_queue
)
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(EndFrame())

View File

@@ -8,7 +8,13 @@ import aiohttp
import asyncio
import sys
from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -20,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -27,21 +34,20 @@ logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(OutputImageRawFrame(
image=frame.image,
size=frame.size,
format=frame.format)
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
@@ -52,15 +58,17 @@ async def main():
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url, token, "Test",
room_url,
token,
"Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)
camera_out_height=720,
),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -10,7 +10,13 @@ import sys
import tkinter as tk
from pipecat.frames.frames import Frame, InputAudioRawFrame, InputImageRawFrame, OutputAudioRawFrame, OutputImageRawFrame
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -24,31 +30,33 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(OutputImageRawFrame(
image=frame.image,
size=frame.size,
format=frame.format)
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -57,8 +65,8 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(
audio_in_enabled=True))
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
tk_transport = TkLocalTransport(
tk_root,
@@ -67,7 +75,9 @@ async def main():
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
camera_out_height=720,
),
)
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -52,9 +55,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,15 +68,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -35,6 +35,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -53,12 +54,12 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
class OutboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -71,7 +72,6 @@ class OutboundSoundEffectWrapper(FrameProcessor):
class InboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -95,13 +95,11 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -122,18 +120,20 @@ async def main():
fl = FrameLogger("LLM Out")
fl2 = FrameLogger("Transcription In")
pipeline = Pipeline([
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out,
]
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -86,15 +88,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -102,5 +106,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -62,8 +64,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -73,8 +75,8 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(
model="gemini-1.5-flash-latest",
api_key=os.getenv("GOOGLE_API_KEY"))
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -88,15 +90,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +108,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -71,10 +73,7 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
openai = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -88,15 +87,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +105,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -71,14 +73,14 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
anthropic = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY")
)
anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
)
@transport.event_handler("on_first_participant_joined")
@@ -88,15 +90,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +108,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -28,7 +29,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -40,8 +40,9 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = WhisperSTTService()

View File

@@ -19,6 +19,7 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -26,7 +27,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -22,6 +22,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,7 +30,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -41,8 +41,9 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

View File

@@ -5,10 +5,15 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -19,13 +24,6 @@ from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
await llm.push_frame(TextFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
await llm.push_frame(TextFrame("Let me check on that. "))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
@@ -52,8 +55,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -61,15 +64,10 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
None,
fetch_weather_from_api,
start_callback=start_fetch_weather)
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
@@ -89,17 +87,15 @@ async def main():
},
"format": {
"type": "string",
"enum": [
"celsius",
"fahrenheit"],
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": [
"location",
"format"],
"required": ["location", "format"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -110,16 +106,18 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
fl_in,
transport.input(),
context_aggregator.user(),
llm,
fl_out,
tts,
transport.output(),
context_aggregator.assistant(),
])
pipeline = Pipeline(
[
# fl_in,
transport.input(),
context_aggregator.user(),
llm,
# fl_out,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@@ -133,5 +131,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -28,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -39,7 +40,11 @@ current_voice = "News Lady"
async def switch_voice(function_name, tool_call_id, args, llm, context, result_callback):
global current_voice
current_voice = args["voice"]
await result_callback({"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."})
await result_callback(
{
"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."
}
)
async def news_lady_filter(frame) -> bool:
@@ -66,8 +71,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
news_lady = CartesiaTTSService(
@@ -85,9 +90,7 @@ async def main():
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_voice", switch_voice)
tools = [
@@ -106,7 +109,9 @@ async def main():
},
"required": ["voice"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -117,18 +122,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -139,7 +146,9 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}."})
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}.",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -29,6 +29,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -64,8 +65,8 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
vad_audio_passthrough=True,
),
)
stt = WhisperSTTService(model=Model.LARGE)
@@ -80,9 +81,7 @@ async def main():
voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [
@@ -101,7 +100,9 @@ async def main():
},
"required": ["language"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -112,18 +113,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant() # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -134,7 +137,9 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."})
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}.",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,10 +14,16 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -25,6 +31,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,15 +50,15 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak"
base_url="http://0.0.0.0:8080/v1/speak",
)
llm = OpenAILLMService(
@@ -60,7 +67,7 @@ async def main():
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1"
base_url="http://0.0.0.0:8000/v1",
)
messages = [
@@ -73,14 +80,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -93,8 +102,7 @@ async def main():
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Handle "latency-ping" messages. The client will send app messages that look like
@@ -111,14 +119,18 @@ async def main():
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}},
participant_id=sender))
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
# And push to the pipeline for the Daily transport.output to send
await tma_in.push_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender))
participant_id=sender,
)
)
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -53,9 +56,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,33 +70,41 @@ async def main():
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -20,6 +20,7 @@ from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,12 +30,7 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-i",
"--input",
type=str,
required=True,
help="Input video file")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
(room_url, _, args) = await configure_with_args(session, parser)
@@ -49,7 +45,7 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
),
)
gst = GStreamerPipelineSource(
@@ -59,13 +55,15 @@ async def main():
video_height=720,
audio_sample_rate=16000,
audio_channels=1,
)
),
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)

View File

@@ -19,6 +19,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -38,20 +39,22 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
),
)
gst = GStreamerPipelineSource(
pipeline="videotestsrc ! capsfilter caps=\"video/x-raw,width=1280,height=720,framerate=30/1\"",
pipeline='videotestsrc ! capsfilter caps="video/x-raw,width=1280,height=720,framerate=30/1"',
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
clock_sync=False))
video_width=1280, video_height=720, clock_sync=False
),
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)

View File

@@ -23,6 +23,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -46,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -56,8 +57,7 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620"
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
llm.register_function("get_weather", get_weather)
@@ -90,18 +90,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.

View File

@@ -23,6 +23,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -55,8 +56,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -67,7 +68,7 @@ async def main():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True
enable_prompt_caching_beta=True,
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
@@ -100,7 +101,7 @@ async def main():
},
"required": ["question"],
},
}
},
]
# todo: test with very short initial user message
@@ -134,28 +135,28 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
"type": "text",
"text": system_prompt,
}
]
],
},
{
"role": "user",
"content": "Start the conversation by introducing yourself."
}]
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]

View File

@@ -25,6 +25,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -32,12 +33,8 @@ logger.add(sys.stderr, level="DEBUG")
async def get_current_weather(
function_name,
tool_call_id,
arguments,
llm,
context,
result_callback):
function_name, tool_call_id, arguments, llm, context, result_callback
):
logger.debug("IN get_current_weather")
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -55,8 +52,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -104,26 +101,28 @@ Reminder:
"""
messages = [{"role": "system",
"content": system_prompt},
{"role": "user",
"content": "Wait for the user to say something."}]
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Wait for the user to say something."},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.

View File

@@ -17,16 +17,13 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -42,15 +39,19 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -43,6 +43,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -60,11 +61,7 @@ for i in range(1, 26):
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(
image=img.tobytes(),
size=img.size,
format=img.format)
)
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
flipped = sprites[::-1]
sprites.extend(flipped)
@@ -110,7 +107,9 @@ class UserImageRequester(FrameProcessor):
if self.participant_id and isinstance(frame, TextFrame):
if frame.text == user_request_answer:
await self.push_frame(UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(TextFrame("Describe the image in a short sentence."))
elif isinstance(frame, UserImageRawFrame):
await self.push_frame(frame)
@@ -154,8 +153,8 @@ async def main():
camera_out_height=576,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -163,9 +162,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ta = TalkingAnimation()
@@ -188,17 +185,17 @@ async def main():
ura = LLMUserResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
ura,
llm,
ParallelPipeline(
[sa, ir, va, moondream],
[tf, imgf]),
tts,
ta,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
ura,
llm,
ParallelPipeline([sa, ir, va, moondream], [tf, imgf]),
tts,
ta,
transport.output(),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(quiet_frame)

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -49,41 +50,44 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = OutputAudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(),
audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
class IntakeProcessor:
def __init__(self, context: OpenAILLMContext):
print(f"Initializing context from IntakeProcessor")
context.add_message({"role": "system", "content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function."})
context.set_tools([
context.add_message(
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}},
"role": "system",
"content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
}
)
context.set_tools(
[
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}
},
},
},
},
}])
}
]
)
async def verify_birthday(
self,
function_name,
tool_call_id,
args,
llm,
context,
result_callback):
self, function_name, tool_call_id, args, llm, context, result_callback
):
if args["birthday"] == "1983-01-01":
context.set_tools(
[
@@ -110,18 +114,35 @@ class IntakeProcessor:
},
},
},
}},
}
},
},
},
}])
}
]
)
# It's a bit weird to push this to the LLM, but it gets it into the pipeline
# await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM)
# We don't need the function call in the context, so just return a new
# system message and let the framework re-prompt
await result_callback([{"role": "system", "content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages."}])
await result_callback(
[
{
"role": "system",
"content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
}
]
)
else:
# The user provided an incorrect birthday; ask them to try again
await result_callback([{"role": "system", "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function."}])
await result_callback(
[
{
"role": "system",
"content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
}
]
)
async def start_prescriptions(self, function_name, llm, context):
print(f"!!! doing start prescriptions")
@@ -144,16 +165,22 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "What the user is allergic to",
}},
}
},
},
}},
}
},
},
},
}])
}
]
)
context.add_message(
{
"role": "system",
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function."})
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
}
)
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
@@ -179,17 +206,22 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's medical condition",
}},
}
},
},
}},
}
},
},
},
},
])
]
)
context.add_message(
{
"role": "system",
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function."})
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
@@ -213,24 +245,31 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}},
}
},
},
}},
}
},
},
},
}])
}
]
)
context.add_message(
{
"role": "system",
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."})
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
# move to finish call
context.set_tools([])
context.add_message({"role": "system",
"content": "Now, thank the user and end the conversation."})
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
@@ -261,7 +300,7 @@ async def main():
# tier="nova",
# model="2-general"
# )
)
),
)
tts = CartesiaTTSService(
@@ -274,9 +313,7 @@ async def main():
# voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
# )
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = []
context = OpenAILLMContext(messages=messages)
@@ -285,33 +322,31 @@ async def main():
intake = IntakeProcessor(context)
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
"list_prescriptions",
intake.save_data,
start_callback=intake.start_prescriptions)
"list_prescriptions", intake.save_data, start_callback=intake.start_prescriptions
)
llm.register_function(
"list_allergies",
intake.save_data,
start_callback=intake.start_allergies)
"list_allergies", intake.save_data, start_callback=intake.start_allergies
)
llm.register_function(
"list_conditions",
intake.save_data,
start_callback=intake.start_conditions)
"list_conditions", intake.save_data, start_callback=intake.start_conditions
)
llm.register_function(
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons)
"list_visit_reasons", intake.save_data, start_callback=intake.start_visit_reasons
)
fl = FrameLogger("LLM Output")
pipeline = Pipeline([
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
])
pipeline = Pipeline(
[
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,19 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
print(f"to join a test room, visit http://localhost:{config.port}/start")

View File

@@ -14,14 +14,17 @@ from PIL import Image
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import (
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
TTSAudioRawFrame,
TTSStoppedFrame
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -34,6 +37,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -49,11 +53,7 @@ for i in range(1, 26):
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(
image=img.tobytes(),
size=img.size,
format=img.format)
)
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
flipped = sprites[::-1]
sprites.extend(flipped)
@@ -111,7 +111,7 @@ async def main():
# tier="nova",
# model="2-general"
# )
)
),
)
tts = ElevenLabsTTSService(
@@ -120,7 +120,6 @@ async def main():
# English
#
voice_id="pNInz6obpgDQGcFmaJgB",
#
# Spanish
#
@@ -128,9 +127,7 @@ async def main():
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -139,7 +136,6 @@ async def main():
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
#
# Spanish
#
@@ -152,15 +148,17 @@ async def main():
ta = TalkingAnimation()
pipeline = Pipeline([
transport.input(),
user_response,
llm,
tts,
ta,
transport.output(),
assistant_response,
])
pipeline = Pipeline(
[
transport.input(),
user_response,
llm,
tts,
ta,
transport.output(),
assistant_response,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
await task.queue_frame(quiet_frame)

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -9,11 +9,18 @@ from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame, EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from processors import StoryProcessor, StoryImageProcessor
from prompts import LLM_BASE_PROMPT, LLM_INTRO_PROMPT, CUE_USER_TURN
@@ -22,6 +29,7 @@ from utils.helpers import load_sounds, load_images
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +41,6 @@ images = load_images(["book1.png", "book2.png"])
async def main(room_url, token=None):
async with aiohttp.ClientSession() as session:
# -------------- Transport --------------- #
transport = DailyTransport(
@@ -47,17 +54,14 @@ async def main(room_url, token=None):
camera_out_height=768,
transcription_enabled=True,
vad_enabled=True,
)
),
)
logger.debug("Transport created for room:" + room_url)
# -------------- Services --------------- #
llm_service = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm_service = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts_service = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
@@ -65,10 +69,7 @@ async def main(room_url, token=None):
)
fal_service_params = FalImageGenService.InputParams(
image_size={
"width": 768,
"height": 768
}
image_size={"width": 768, "height": 768}
)
fal_service = FalImageGenService(
@@ -110,12 +111,12 @@ async def main(room_url, token=None):
transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
images['book1'],
images["book1"],
LLMMessagesFrame([LLM_INTRO_PROMPT]),
DailyTransportMessageFrame(CUE_USER_TURN),
sounds["listening"],
images['book2'],
StopTaskFrame()
images["book2"],
StopTaskFrame(),
]
)
@@ -125,16 +126,18 @@ async def main(room_url, token=None):
# The main story pipeline is used to continue the story based on user
# input.
main_pipeline = Pipeline([
transport.input(),
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
transport.output(),
llm_responses
])
main_pipeline = Pipeline(
[
transport.input(),
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
transport.output(),
llm_responses,
]
)
main_task = PipelineTask(main_pipeline)
@@ -150,6 +153,7 @@ async def main(room_url, token=None):
await runner.run(main_task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Daily Storyteller Bot")
parser.add_argument("-u", type=str, help="Room URL")

View File

@@ -20,10 +20,15 @@ from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Fast API Config ------------ #
@@ -38,12 +43,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -85,55 +91,50 @@ async def start_bot(request: Request) -> JSONResponse:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new VM, or run as a shell process (not recommended)
if os.getenv("RUN_AS_VM", False):
try:
await virtualize_bot(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
else:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
@app.get("/{path_name:path}", response_class=FileResponse)
@@ -155,6 +156,7 @@ async def catch_all(path_name: Optional[str] = ""):
# ------------ Virtualization ------------ #
async def virtualize_bot(room_url: str, token: str):
"""
This is an example of how to virtualize the bot using Fly.io
@@ -163,20 +165,19 @@ async def virtualize_bot(room_url: str, token: str):
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "storytelling-chatbot")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 src/bot.py -u {room_url} -t {token}"
@@ -185,31 +186,28 @@ async def virtualize_bot(room_url: str, token: str):
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 512
}
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 512},
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
vm_id = data["id"]
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
@@ -221,8 +219,13 @@ async def virtualize_bot(room_url: str, token: str):
if __name__ == "__main__":
# Check environment variables
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
required_env_vars = [
"OPENAI_API_KEY",
"DAILY_API_KEY",
"FAL_KEY",
"ELEVENLABS_VOICE_ID",
"ELEVENLABS_API_KEY",
]
for env_var in required_env_vars:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
@@ -232,20 +235,11 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)

View File

@@ -6,7 +6,8 @@ from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
TextFrame,
UserStoppedSpeakingFrame)
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransportMessageFrame
@@ -35,6 +36,7 @@ class StoryPromptFrame(TextFrame):
# ------------ Frame Processors ----------- #
class StoryImageProcessor(FrameProcessor):
"""
Processor for image prompt frames that will be sent to the FAL service.
@@ -113,7 +115,7 @@ class StoryProcessor(FrameProcessor):
# Extract the image prompt from the text using regex
image_prompt = re.search(r"<(.*?)>", self._text).group(1)
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", '', self._text, count=1)
self._text = re.sub(r"<.*?>", "", self._text, count=1)
# Process the image prompt frame
await self.push_frame(StoryImageFrame(image_prompt))
@@ -124,8 +126,7 @@ class StoryProcessor(FrameProcessor):
if re.search(r".*\[[bB]reak\].*", self._text):
# Remove the [break] token from the text
# so it isn't spoken out loud by the TTS
self._text = re.sub(r'\[[bB]reak\]', '',
self._text, flags=re.IGNORECASE)
self._text = re.sub(r"\[[bB]reak\]", "", self._text, flags=re.IGNORECASE)
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
# Append the sentence to the story

View File

@@ -3,7 +3,7 @@ LLM_INTRO_PROMPT = {
"content": "You are a creative storyteller who loves to tell whimsical, fantastical stories. \
Your goal is to craft an engaging and fun story. \
Start by asking the user what kind of story they'd like to hear. Don't provide any examples. \
Keep your response to only a few sentences."
Keep your response to only a few sentences.",
}
@@ -25,7 +25,7 @@ LLM_BASE_PROMPT = {
Responses should use the format: <...> story sentence [break] <...> story sentence [break] ... \
After each response, ask me how I'd like the story to continue and wait for my input. \
Please ensure your responses are less than 3-4 sentences long. \
Please refrain from using any explicit language or content. Do not tell scary stories."
Please refrain from using any explicit language or content. Do not tell scary stories.",
}

View File

@@ -17,7 +17,8 @@ def load_images(image_files):
# Open the image and convert it to bytes
with Image.open(full_path) as img:
images[filename] = OutputImageRawFrame(
image=img.tobytes(), size=img.size, format=img.format)
image=img.tobytes(), size=img.size, format=img.format
)
return images
@@ -31,8 +32,10 @@ def load_sounds(sound_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[filename] = OutputAudioRawFrame(audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels())
sounds[filename] = OutputAudioRawFrame(
audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels(),
)
return sounds

View File

@@ -17,16 +17,13 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -42,15 +39,19 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -13,7 +13,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -24,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
# Run this script directly from your command line.
@@ -45,15 +48,17 @@ def truncate_content(content, model_name):
return encoding.decode(truncated_tokens)
return content
# Main function to extract content from url
async def get_article_content(url: str, aiohttp_session: aiohttp.ClientSession):
if 'arxiv.org' in url:
if "arxiv.org" in url:
return await get_arxiv_content(url, aiohttp_session)
else:
return await get_wikipedia_content(url, aiohttp_session)
# Helper function to extract content from Wikipedia url (this is
# technically agnostic to URL type but will work best with Wikipedia
# articles)
@@ -65,23 +70,24 @@ async def get_wikipedia_content(url: str, aiohttp_session: aiohttp.ClientSession
return "Failed to download Wikipedia article."
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
soup = BeautifulSoup(text, "html.parser")
content = soup.find('div', {'class': 'mw-parser-output'})
content = soup.find("div", {"class": "mw-parser-output"})
if content:
return content.get_text()
else:
return "Failed to extract Wikipedia article content."
# Helper function to extract content from arXiv url
async def get_arxiv_content(url: str, aiohttp_session: aiohttp.ClientSession):
if '/abs/' in url:
url = url.replace('/abs/', '/pdf/')
if not url.endswith('.pdf'):
url += '.pdf'
if "/abs/" in url:
url = url.replace("/abs/", "/pdf/")
if not url.endswith(".pdf"):
url += ".pdf"
async with aiohttp_session.get(url) as response:
if response.status != 200:
@@ -95,6 +101,7 @@ async def get_arxiv_content(url: str, aiohttp_session: aiohttp.ClientSession):
text += page.extract_text()
return text
# This is the main function that handles STT -> LLM -> TTS
@@ -116,40 +123,46 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
sample_rate=44100,
params=CartesiaTTSService.InputParams(
sample_rate=44100,
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
messages = [{
"role": "system", "content": f"""You are an AI study partner. You have been given the following article content:
messages = [
{
"role": "system",
"content": f"""You are an AI study partner. You have been given the following article content:
{article_content}
Your task is to help the user understand and learn from this article in 2 sentences. THESE RESPONSES SHOULD BE ONLY MAX 2 SENTENCES. THIS INSTRUCTION IS VERY IMPORTANT. RESPONSES SHOULDN'T BE LONG.
""", }, ]
""",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -159,12 +172,15 @@ Your task is to help the user understand and learn from this article in 2 senten
messages.append(
{
"role": "system",
"content": "Hello! I'm ready to discuss the article with you. What would you like to learn about?"})
"content": "Hello! I'm ready to discuss the article with you. What would you like to learn about?",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -22,13 +22,15 @@ from pipecat.transports.services.daily import (
DailyParams,
DailyTranscriptionSettings,
DailyTransport,
DailyTransportMessageFrame)
DailyTransportMessageFrame,
)
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,7 +46,6 @@ It also isn't saving what the user or bot says into the context object for use i
# We need to use a custom service here to yield LLM frames without saving
# any context
class TranslationProcessor(FrameProcessor):
def __init__(self, language):
super().__init__()
self._language = language
@@ -80,10 +81,7 @@ class TranslationSubtitles(FrameProcessor):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
message = {
"language": self._language,
"text": frame.text
}
message = {"language": self._language, "text": frame.text}
await self.push_frame(DailyTransportMessageFrame(message))
await self.push_frame(frame)
@@ -100,10 +98,8 @@ async def main():
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
transcription_settings=DailyTranscriptionSettings(extra={
"interim_results": False
})
)
transcription_settings=DailyTranscriptionSettings(extra={"interim_results": False}),
),
)
tts = AzureTTSService(
@@ -112,26 +108,14 @@ async def main():
voice="es-ES-AlvaroNeural",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
sa = SentenceAggregator()
tp = TranslationProcessor("Spanish")
lfra = LLMFullResponseAggregator()
ts = TranslationSubtitles("spanish")
pipeline = Pipeline([
transport.input(),
sa,
tp,
llm,
lfra,
ts,
tts,
transport.output()
])
pipeline = Pipeline([transport.input(), sa, tp, llm, lfra, ts, tts, transport.output()])
task = PipelineTask(pipeline)

View File

@@ -15,11 +15,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -35,15 +32,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -7,18 +7,22 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams,
)
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,15 +38,13 @@ async def run_bot(websocket_client, stream_sid):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid)
)
serializer=TwilioFrameSerializer(stream_sid),
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY'))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -59,23 +61,24 @@ async def run_bot(websocket_client, stream_sid):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out, # LLM responses
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_disconnected")

View File

@@ -19,7 +19,7 @@ app.add_middleware(
)
@app.post('/start_call')
@app.post("/start_call")
async def start_call():
print("POST TwiML")
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")
@@ -32,7 +32,7 @@ async def websocket_endpoint(websocket: WebSocket):
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data['start']['streamSid']
stream_sid = call_data["start"]["streamSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid)

View File

@@ -14,17 +14,21 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.transports.network.websocket_server import (
WebsocketServerParams,
WebsocketServerTransport,
)
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -38,13 +42,11 @@ async def main():
add_wav_header=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
vad_audio_passthrough=True,
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
@@ -63,28 +65,30 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out, # LLM responses
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -35,6 +35,7 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.34.0" ]
aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
@@ -43,20 +44,20 @@ elevenlabs = [ "websockets~=12.0" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.7.2" ]
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.13.1" ]
livekit = [ "livekit~=0.13.1", "tenacity~=9.0.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
openai = [ "openai~=1.37.2" ]
openpipe = [ "openpipe~=4.24.0" ]
playht = [ "pyht~=0.0.28" ]
silero = [ "silero-vad~=5.1" ]
silero = [ "onnxruntime>=1.16.1" ]
together = [ "together~=1.2.7" ]
websocket = [ "websockets~=12.0", "fastapi~=0.112.1" ]
websocket = [ "websockets~=12.0", "fastapi~=0.115.0" ]
whisper = [ "faster-whisper~=1.0.3" ]
xtts = [ "resampy~=0.4.3" ]

View File

@@ -8,7 +8,6 @@ from abc import ABC, abstractmethod
class BaseClock(ABC):
@abstractmethod
def get_time(self) -> int:
pass

View File

@@ -10,7 +10,6 @@ from pipecat.clocks.base_clock import BaseClock
class SystemClock(BaseClock):
def __init__(self):
self._time = 0

View File

@@ -4,9 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, List, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, List, Optional, Tuple, Union
from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
@@ -43,6 +42,7 @@ class DataFrame(Frame):
@dataclass
class AudioRawFrame(DataFrame):
"""A chunk of audio."""
audio: bytes
sample_rate: int
num_channels: int
@@ -58,9 +58,8 @@ class AudioRawFrame(DataFrame):
@dataclass
class InputAudioRawFrame(AudioRawFrame):
"""A chunk of audio usually coming from an input transport.
"""A chunk of audio usually coming from an input transport."""
"""
pass
@@ -70,14 +69,14 @@ class OutputAudioRawFrame(AudioRawFrame):
transport's microphone has been enabled.
"""
pass
@dataclass
class TTSAudioRawFrame(OutputAudioRawFrame):
"""A chunk of output audio generated by a TTS service.
"""A chunk of output audio generated by a TTS service."""
"""
pass
@@ -87,6 +86,7 @@ class ImageRawFrame(DataFrame):
enabled.
"""
image: bytes
size: Tuple[int, int]
format: str | None
@@ -112,6 +112,7 @@ class UserImageRawFrame(InputImageRawFrame):
transport's camera is enabled.
"""
user_id: str
def __str__(self):
@@ -125,11 +126,12 @@ class VisionImageRawFrame(InputImageRawFrame):
shown by the transport if the transport's camera is enabled.
"""
text: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
@dataclass
@@ -138,6 +140,7 @@ class URLImageRawFrame(OutputImageRawFrame):
transport's camera is enabled.
"""
url: str | None
def __str__(self):
@@ -152,6 +155,7 @@ class SpriteFrame(Frame):
`camera_out_framerate` constructor parameter.
"""
images: List[ImageRawFrame]
def __str__(self):
@@ -165,11 +169,12 @@ class TextFrame(DataFrame):
be used to send text through pipelines.
"""
text: str
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text})"
return f"{self.name}(pts: {pts}, text: [{self.text}])"
@dataclass
@@ -178,24 +183,26 @@ class TranscriptionFrame(TextFrame):
transport's receive queue when a participant speaks.
"""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class InterimTranscriptionFrame(TextFrame):
"""A text frame with interim transcription-specific data. Will be placed in
the transport's receive queue when a participant speaks."""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
@@ -207,6 +214,7 @@ class LLMMessagesFrame(DataFrame):
processors.
"""
messages: List[dict]
@@ -216,6 +224,7 @@ class LLMMessagesAppendFrame(DataFrame):
current context.
"""
messages: List[dict]
@@ -226,6 +235,7 @@ class LLMMessagesUpdateFrame(DataFrame):
LLMMessagesFrame.
"""
messages: List[dict]
@@ -235,13 +245,14 @@ class LLMSetToolsFrame(DataFrame):
The specific format depends on the LLM being used, but it should typically
contain JSON Schema objects.
"""
tools: List[dict]
@dataclass
class LLMEnablePromptCachingFrame(DataFrame):
"""A frame to enable/disable prompt caching in certain LLMs.
"""
"""A frame to enable/disable prompt caching in certain LLMs."""
enable: bool
@@ -251,6 +262,7 @@ class TTSSpeakFrame(DataFrame):
pipeline (if any).
"""
text: str
@@ -262,6 +274,7 @@ class TransportMessageFrame(DataFrame):
def __str__(self):
return f"{self.name}(message: {self.message})"
#
# App frames. Application user-defined frames.
#
@@ -271,6 +284,7 @@ class TransportMessageFrame(DataFrame):
class AppFrame(Frame):
pass
#
# System frames
#
@@ -284,6 +298,7 @@ class SystemFrame(Frame):
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
@@ -294,6 +309,7 @@ class StartFrame(SystemFrame):
@dataclass
class CancelFrame(SystemFrame):
"""Indicates that a pipeline needs to stop right away."""
pass
@@ -304,6 +320,7 @@ class ErrorFrame(SystemFrame):
bot should exit.
"""
error: str
fatal: bool = False
@@ -317,9 +334,31 @@ class FatalErrorFrame(ErrorFrame):
that the bot should exit.
"""
fatal: bool = field(default=True, init=False)
@dataclass
class EndTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream.
"""
pass
@dataclass
class CancelTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream.
"""
pass
@dataclass
class StopTaskFrame(SystemFrame):
"""Indicates that a pipeline task should be stopped but that the pipeline
@@ -327,6 +366,7 @@ class StopTaskFrame(SystemFrame):
the pipeline task.
"""
pass
@@ -338,6 +378,7 @@ class StartInterruptionFrame(SystemFrame):
guaranteed).
"""
pass
@@ -349,6 +390,7 @@ class StopInterruptionFrame(SystemFrame):
guaranteed).
"""
pass
@@ -359,13 +401,14 @@ class BotInterruptionFrame(SystemFrame):
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass
@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
"""Emitted by processor that can compute metrics like latencies."""
data: List[MetricsData]
@@ -388,6 +431,7 @@ class EndFrame(ControlFrame):
was sent (unline system frames).
"""
pass
@@ -395,12 +439,14 @@ class EndFrame(ControlFrame):
class LLMFullResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of an LLM response. Following by one or
more TextFrame and a final LLMFullResponseEndFrame."""
pass
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Indicates the end of an LLM response."""
pass
@@ -412,28 +458,28 @@ class UserStartedSpeakingFrame(ControlFrame):
with a TranscriptionFrame)
"""
pass
@dataclass
class UserStoppedSpeakingFrame(ControlFrame):
"""Emitted by the VAD to indicate that a user stopped speaking."""
pass
@dataclass
class BotStartedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot started speaking.
"""Emitted upstream by transport outputs to indicate the bot started speaking."""
"""
pass
@dataclass
class BotStoppedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot stopped speaking.
"""Emitted upstream by transport outputs to indicate the bot stopped speaking."""
"""
pass
@@ -445,6 +491,7 @@ class BotSpeakingFrame(ControlFrame):
since the user might be listening.
"""
pass
@@ -457,18 +504,21 @@ class TTSStartedFrame(ControlFrame):
needing to control this in the TTS service.
"""
pass
@dataclass
class TTSStoppedFrame(ControlFrame):
"""Indicates the end of a TTS response."""
pass
@dataclass
class UserImageRequestFrame(ControlFrame):
"""A frame user to request an image from the given user."""
user_id: str
context: Optional[Any] = None
@@ -477,115 +527,51 @@ class UserImageRequestFrame(ControlFrame):
@dataclass
class LLMModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM model.
"""
model: str
class LLMUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update LLM settings."""
model: Optional[str] = None
temperature: Optional[float] = None
top_k: Optional[int] = None
top_p: Optional[float] = None
frequency_penalty: Optional[float] = None
presence_penalty: Optional[float] = None
max_tokens: Optional[int] = None
seed: Optional[int] = None
extra: dict = field(default_factory=dict)
@dataclass
class LLMTemperatureUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM temperature.
"""
temperature: float
class TTSUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update TTS settings."""
model: Optional[str] = None
voice: Optional[str] = None
language: Optional[Language] = None
speed: Optional[Union[str, float]] = None
emotion: Optional[List[str]] = None
engine: Optional[str] = None
pitch: Optional[str] = None
rate: Optional[str] = None
volume: Optional[str] = None
emphasis: Optional[str] = None
style: Optional[str] = None
style_degree: Optional[str] = None
role: Optional[str] = None
@dataclass
class LLMTopKUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM top_k.
"""
top_k: int
class STTUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update STT settings."""
@dataclass
class LLMTopPUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM top_p.
"""
top_p: float
@dataclass
class LLMFrequencyPenaltyUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM frequency
penalty.
"""
frequency_penalty: float
@dataclass
class LLMPresencePenaltyUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM presence
penalty.
"""
presence_penalty: float
@dataclass
class LLMMaxTokensUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM max tokens.
"""
max_tokens: int
@dataclass
class LLMSeedUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM seed.
"""
seed: int
@dataclass
class LLMExtraUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM extra params.
"""
extra: dict
@dataclass
class TTSModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the TTS model.
"""
model: str
@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice.
"""
voice: str
@dataclass
class TTSLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS language and
optional voice.
"""
language: Language
@dataclass
class STTModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the STT model and optional
language.
"""
model: str
@dataclass
class STTLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to STT language.
"""
language: Language
model: Optional[str] = None
language: Optional[Language] = None
@dataclass
class FunctionCallInProgressFrame(SystemFrame):
"""A frame signaling that a function call is in progress.
"""
"""A frame signaling that a function call is in progress."""
function_name: str
tool_call_id: str
arguments: str
@@ -593,12 +579,13 @@ class FunctionCallInProgressFrame(SystemFrame):
@dataclass
class FunctionCallResultFrame(DataFrame):
"""A frame containing the result of an LLM function (tool) call.
"""
"""A frame containing the result of an LLM function (tool) call."""
function_name: str
tool_call_id: str
arguments: str
result: Any
run_llm: bool = True
@dataclass
@@ -606,4 +593,5 @@ class VADParamsUpdateFrame(ControlFrame):
"""A control frame containing a request to update VAD params. Intended
to be pushed upstream from RTVI processor.
"""
params: VADParams

View File

@@ -12,7 +12,6 @@ from pipecat.processors.frame_processor import FrameProcessor
class BasePipeline(FrameProcessor):
def __init__(self):
super().__init__()

View File

@@ -18,7 +18,6 @@ from loguru import logger
class Source(FrameProcessor):
def __init__(self, upstream_queue: asyncio.Queue):
super().__init__()
self._up_queue = upstream_queue
@@ -34,7 +33,6 @@ class Source(FrameProcessor):
class Sink(FrameProcessor):
def __init__(self, downstream_queue: asyncio.Queue):
super().__init__()
self._down_queue = downstream_queue

View File

@@ -12,7 +12,6 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class PipelineSource(FrameProcessor):
def __init__(self, upstream_push_frame: Callable[[Frame, FrameDirection], Coroutine]):
super().__init__()
self._upstream_push_frame = upstream_push_frame
@@ -28,7 +27,6 @@ class PipelineSource(FrameProcessor):
class PipelineSink(FrameProcessor):
def __init__(self, downstream_push_frame: Callable[[Frame, FrameDirection], Coroutine]):
super().__init__()
self._downstream_push_frame = downstream_push_frame
@@ -44,7 +42,6 @@ class PipelineSink(FrameProcessor):
class Pipeline(BasePipeline):
def __init__(self, processors: List[FrameProcessor]):
super().__init__()

View File

@@ -14,7 +14,6 @@ from loguru import logger
class PipelineRunner:
def __init__(self, *, name: str | None = None, handle_sigint: bool = True):
self.id: int = obj_id()
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
@@ -42,12 +41,10 @@ class PipelineRunner:
def _setup_sigint(self):
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda *args: asyncio.create_task(self._sig_handler())
signal.SIGINT, lambda *args: asyncio.create_task(self._sig_handler())
)
loop.add_signal_handler(
signal.SIGTERM,
lambda *args: asyncio.create_task(self._sig_handler())
signal.SIGTERM, lambda *args: asyncio.create_task(self._sig_handler())
)
async def _sig_handler(self):

View File

@@ -9,16 +9,21 @@ import asyncio
from itertools import chain
from typing import List
from pipecat.frames.frames import ControlFrame, Frame, SystemFrame
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import Frame
from loguru import logger
class Source(FrameProcessor):
class SyncFrame(ControlFrame):
"""This frame is used to know when the internal pipelines have finished."""
pass
class Source(FrameProcessor):
def __init__(self, upstream_queue: asyncio.Queue):
super().__init__()
self._up_queue = upstream_queue
@@ -34,7 +39,6 @@ class Source(FrameProcessor):
class Sink(FrameProcessor):
def __init__(self, downstream_queue: asyncio.Queue):
super().__init__()
self._down_queue = downstream_queue
@@ -69,13 +73,16 @@ class SyncParallelPipeline(BasePipeline):
raise TypeError(f"SyncParallelPipeline argument {processors} is not a list")
# We add a source at the beginning of the pipeline and a sink at the end.
source = Source(self._up_queue)
sink = Sink(self._down_queue)
up_queue = asyncio.Queue()
down_queue = asyncio.Queue()
source = Source(up_queue)
sink = Sink(down_queue)
processors: List[FrameProcessor] = [source] + processors + [sink]
# Keep track of sources and sinks.
self._sources.append(source)
self._sinks.append(sink)
# Keep track of sources and sinks. We also keep the output queue of
# the source and the sinks so we can use it later.
self._sources.append({"processor": source, "queue": down_queue})
self._sinks.append({"processor": sink, "queue": up_queue})
# Create pipeline
pipeline = Pipeline(processors)
@@ -96,17 +103,46 @@ class SyncParallelPipeline(BasePipeline):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# The last processor of each pipeline needs to be synchronous otherwise
# this element won't work. Since, we know it should be synchronous we
# push a SyncFrame. Since frames are ordered we know this frame will be
# pushed after the synchronous processor has pushed its data allowing us
# to synchrnonize all the internal pipelines by waiting for the
# SyncFrame in all of them.
async def wait_for_sync(
obj, main_queue: asyncio.Queue, frame: Frame, direction: FrameDirection
):
processor = obj["processor"]
queue = obj["queue"]
await processor.process_frame(frame, direction)
# If we have a system frame we don't need to synchrnonize anything.
if isinstance(frame, SystemFrame):
await main_queue.put(frame)
else:
await processor.process_frame(SyncFrame(), direction)
frame = await queue.get()
while not isinstance(frame, SyncFrame):
await main_queue.put(frame)
queue.task_done()
frame = await queue.get()
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
await asyncio.gather(
*[wait_for_sync(s, self._up_queue, frame, direction) for s in self._sinks]
)
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
await asyncio.gather(
*[wait_for_sync(s, self._down_queue, frame, direction) for s in self._sources]
)
seen_ids = set()
while not self._up_queue.empty():
frame = await self._up_queue.get()
if frame and frame.id not in seen_ids:
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.UPSTREAM)
seen_ids.add(frame.id)
self._up_queue.task_done()
@@ -114,7 +150,7 @@ class SyncParallelPipeline(BasePipeline):
seen_ids = set()
while not self._down_queue.empty():
frame = await self._down_queue.get()
if frame and frame.id not in seen_ids:
if frame.id not in seen_ids:
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
seen_ids.add(frame.id)
self._down_queue.task_done()

View File

@@ -14,12 +14,15 @@ from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
CancelFrame,
CancelTaskFrame,
EndFrame,
EndTaskFrame,
ErrorFrame,
Frame,
MetricsFrame,
StartFrame,
StopTaskFrame)
StopTaskFrame,
)
from pipecat.metrics.metrics import TTFBMetricsData, ProcessingMetricsData
from pipecat.pipeline.base_pipeline import BasePipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -37,7 +40,6 @@ class PipelineParams(BaseModel):
class Source(FrameProcessor):
def __init__(self, up_queue: asyncio.Queue):
super().__init__()
self._up_queue = up_queue
@@ -52,7 +54,13 @@ class Source(FrameProcessor):
await self.push_frame(frame, direction)
async def _handle_upstream_frame(self, frame: Frame):
if isinstance(frame, ErrorFrame):
if isinstance(frame, EndTaskFrame):
# Tell the task we should end nicely.
await self._up_queue.put(EndTaskFrame())
elif isinstance(frame, CancelTaskFrame):
# Tell the task we should end right away.
await self._up_queue.put(CancelTaskFrame())
elif isinstance(frame, ErrorFrame):
logger.error(f"Error running app: {frame}")
if frame.fatal:
# Cancel all tasks downstream.
@@ -61,13 +69,26 @@ class Source(FrameProcessor):
await self._up_queue.put(StopTaskFrame())
class PipelineTask:
class Sink(FrameProcessor):
def __init__(self, down_queue: asyncio.Queue):
super().__init__()
self._down_queue = down_queue
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# We really just want to know when the EndFrame reached the sink.
if isinstance(frame, EndFrame):
await self._down_queue.put(frame)
class PipelineTask:
def __init__(
self,
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
clock: BaseClock = SystemClock()):
self,
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
clock: BaseClock = SystemClock(),
):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
@@ -76,12 +97,16 @@ class PipelineTask:
self._params = params
self._finished = False
self._down_queue = asyncio.Queue()
self._up_queue = asyncio.Queue()
self._down_queue = asyncio.Queue()
self._push_queue = asyncio.Queue()
self._source = Source(self._up_queue)
self._source.link(pipeline)
self._sink = Sink(self._down_queue)
pipeline.link(self._sink)
def has_finished(self):
return self._finished
@@ -95,19 +120,19 @@ class PipelineTask:
# out-of-band from the main streaming task which is what we want since
# we want to cancel right away.
await self._source.push_frame(CancelFrame())
self._process_down_task.cancel()
self._process_push_task.cancel()
self._process_up_task.cancel()
await self._process_down_task
await self._process_push_task
await self._process_up_task
async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue())
self._process_down_task = asyncio.create_task(self._process_down_queue())
await asyncio.gather(self._process_up_task, self._process_down_task)
self._process_push_task = asyncio.create_task(self._process_push_queue())
await asyncio.gather(self._process_up_task, self._process_push_task)
self._finished = True
async def queue_frame(self, frame: Frame):
await self._down_queue.put(frame)
await self._push_queue.put(frame)
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
if isinstance(frames, AsyncIterable):
@@ -125,7 +150,7 @@ class PipelineTask:
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)
async def _process_down_queue(self):
async def _process_push_queue(self):
self._clock.start()
start_frame = StartFrame(
@@ -133,22 +158,26 @@ class PipelineTask:
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock
clock=self._clock,
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
await self._source.process_frame(
self._initial_metrics_frame(), FrameDirection.DOWNSTREAM
)
running = True
should_cleanup = True
while running:
try:
frame = await self._down_queue.get()
frame = await self._push_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not (isinstance(frame, StopTaskFrame) or isinstance(frame, EndFrame))
should_cleanup = not isinstance(frame, StopTaskFrame)
self._down_queue.task_done()
self._push_queue.task_done()
except asyncio.CancelledError:
break
# Cleanup only if we need to.
@@ -159,11 +188,21 @@ class PipelineTask:
self._process_up_task.cancel()
await self._process_up_task
async def _wait_for_endframe(self):
# NOTE(aleix): the Sink element just pushes EndFrames to the down queue,
# so just wait for it. In the future we might do something else here,
# but for now this is fine.
await self._down_queue.get()
async def _process_up_queue(self):
while True:
try:
frame = await self._up_queue.get()
if isinstance(frame, StopTaskFrame):
if isinstance(frame, EndTaskFrame):
await self.queue_frame(EndFrame())
elif isinstance(frame, CancelTaskFrame):
await self.queue_frame(CancelFrame())
elif isinstance(frame, StopTaskFrame):
await self.queue_frame(StopTaskFrame())
self._up_queue.task_done()
except asyncio.CancelledError:

View File

@@ -15,9 +15,7 @@ class SequentialMergePipeline(Pipeline):
for idx, pipeline in enumerate(self.pipelines):
while True:
frame = await pipeline.sink.get()
if isinstance(
frame, EndFrame) or isinstance(
frame, EndPipeFrame):
if isinstance(frame, EndFrame) or isinstance(frame, EndPipeFrame):
break
await self.sink.put(frame)

View File

@@ -41,8 +41,13 @@ class GatedAggregator(FrameProcessor):
Goodbye.
"""
def __init__(self, gate_open_fn, gate_close_fn, start_open,
direction: FrameDirection = FrameDirection.DOWNSTREAM):
def __init__(
self,
gate_open_fn,
gate_close_fn,
start_open,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
):
super().__init__()
self._gate_open_fn = gate_open_fn
self._gate_close_fn = gate_close_fn
@@ -75,7 +80,7 @@ class GatedAggregator(FrameProcessor):
if self._gate_open:
await self.push_frame(frame, direction)
for (f, d) in self._accumulator:
for f, d in self._accumulator:
await self.push_frame(f, d)
self._accumulator = []
else:

View File

@@ -6,7 +6,10 @@
from typing import List, Type
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContextFrame,
OpenAILLMContext,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
@@ -22,11 +25,11 @@ from pipecat.frames.frames import (
TranscriptionFrame,
TextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
UserStoppedSpeakingFrame,
)
class LLMResponseAggregator(FrameProcessor):
def __init__(
self,
*,
@@ -36,7 +39,7 @@ class LLMResponseAggregator(FrameProcessor):
end_frame,
accumulator_frame: Type[TextFrame],
interim_accumulator_frame: Type[TextFrame] | None = None,
handle_interruptions: bool = False
handle_interruptions: bool = False,
):
super().__init__()
@@ -175,7 +178,7 @@ class LLMAssistantResponseAggregator(LLMResponseAggregator):
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True
handle_interruptions=True,
)
@@ -187,7 +190,7 @@ class LLMUserResponseAggregator(LLMResponseAggregator):
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionFrame,
interim_accumulator_frame=InterimTranscriptionFrame
interim_accumulator_frame=InterimTranscriptionFrame,
)
@@ -295,7 +298,7 @@ class LLMAssistantContextAggregator(LLMContextAggregator):
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True
handle_interruptions=True,
)
@@ -308,5 +311,5 @@ class LLMUserContextAggregator(LLMContextAggregator):
start_frame=UserStartedSpeakingFrame,
end_frame=UserStoppedSpeakingFrame,
accumulator_frame=TranscriptionFrame,
interim_accumulator_frame=InterimTranscriptionFrame
interim_accumulator_frame=InterimTranscriptionFrame,
)

View File

@@ -17,7 +17,8 @@ from pipecat.frames.frames import (
Frame,
VisionImageRawFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame)
FunctionCallResultFrame,
)
from pipecat.processors.frame_processor import FrameProcessor
from loguru import logger
@@ -28,12 +29,13 @@ try:
from openai.types.chat import (
ChatCompletionToolParam,
ChatCompletionToolChoiceOptionParam,
ChatCompletionMessageParam
ChatCompletionMessageParam,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable.")
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
# JSON custom encoder to handle bytes arrays so that we can log contexts
@@ -44,20 +46,18 @@ class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, io.BytesIO):
# Convert the first 8 bytes to an ASCII hex string
return (f"{obj.getbuffer()[0:8].hex()}...")
return f"{obj.getbuffer()[0:8].hex()}..."
return super().default(obj)
class OpenAILLMContext:
def __init__(
self,
messages: List[ChatCompletionMessageParam] | None = None,
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN,
):
self._messages: List[ChatCompletionMessageParam] = messages if messages else [
]
self._messages: List[ChatCompletionMessageParam] = messages if messages else []
self._tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = tool_choice
self._tools: List[ChatCompletionToolParam] | NotGiven = tools
@@ -81,19 +81,10 @@ class OpenAILLMContext:
"""
context = OpenAILLMContext()
buffer = io.BytesIO()
Image.frombytes(
frame.format,
frame.size,
frame.image
).save(
buffer,
format="JPEG")
context.add_message({
"content": frame.text,
"role": "user",
"data": buffer,
"mime_type": "image/jpeg"
})
Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
context.add_message(
{"content": frame.text, "role": "user", "data": buffer, "mime_type": "image/jpeg"}
)
return context
@property
@@ -123,9 +114,7 @@ class OpenAILLMContext:
def get_messages_json(self) -> str:
return json.dumps(self._messages, cls=CustomEncoder)
def set_tool_choice(
self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven
):
def set_tool_choice(self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven):
self._tool_choice = tool_choice
def set_tools(self, tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN):
@@ -133,37 +122,42 @@ class OpenAILLMContext:
tools = NOT_GIVEN
self._tools = tools
async def call_function(self,
f: Callable[[str,
str,
Any,
FrameProcessor,
'OpenAILLMContext',
Callable[[Any],
Awaitable[None]]],
Awaitable[None]],
*,
function_name: str,
tool_call_id: str,
arguments: str,
llm: FrameProcessor) -> None:
async def call_function(
self,
f: Callable[
[str, str, Any, FrameProcessor, "OpenAILLMContext", Callable[[Any], Awaitable[None]]],
Awaitable[None],
],
*,
function_name: str,
tool_call_id: str,
arguments: str,
llm: FrameProcessor,
run_llm: bool = True,
) -> None:
# Push a SystemFrame downstream. This frame will let our assistant context aggregator
# know that we are in the middle of a function call. Some contexts/aggregators may
# not need this. But some definitely do (Anthropic, for example).
await llm.push_frame(FunctionCallInProgressFrame(
function_name=function_name,
tool_call_id=tool_call_id,
arguments=arguments,
))
# Define a callback function that pushes a FunctionCallResultFrame downstream.
async def function_call_result_callback(result):
await llm.push_frame(FunctionCallResultFrame(
await llm.push_frame(
FunctionCallInProgressFrame(
function_name=function_name,
tool_call_id=tool_call_id,
arguments=arguments,
result=result))
)
)
# Define a callback function that pushes a FunctionCallResultFrame downstream.
async def function_call_result_callback(result):
await llm.push_frame(
FunctionCallResultFrame(
function_name=function_name,
tool_call_id=tool_call_id,
arguments=arguments,
result=result,
run_llm=run_llm,
)
)
await f(function_name, tool_call_id, arguments, llm, self, function_call_result_callback)
@@ -174,4 +168,5 @@ class OpenAILLMContextFrame(Frame):
OpenAIContextAggregator frame processor.
"""
context: OpenAILLMContext

View File

@@ -12,7 +12,8 @@ from pipecat.frames.frames import (
TextFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
UserStoppedSpeakingFrame,
)
class ResponseAggregator(FrameProcessor):
@@ -49,7 +50,7 @@ class ResponseAggregator(FrameProcessor):
start_frame,
end_frame,
accumulator_frame: TextFrame,
interim_accumulator_frame: TextFrame | None = None
interim_accumulator_frame: TextFrame | None = None,
):
super().__init__()

View File

@@ -4,12 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.frames.frames import (
Frame,
InputImageRawFrame,
TextFrame,
VisionImageRawFrame
)
from pipecat.frames.frames import Frame, InputImageRawFrame, TextFrame, VisionImageRawFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -46,7 +41,8 @@ class VisionImageFrameAggregator(FrameProcessor):
text=self._describe_text,
image=frame.image,
size=frame.size,
format=frame.format)
format=frame.format,
)
await self.push_frame(frame)
self._describe_text = None
else:

View File

@@ -0,0 +1,44 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Any, AsyncGenerator
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
)
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.serializers.base_serializer import FrameSerializer
class AsyncGeneratorProcessor(FrameProcessor):
def __init__(self, *, serializer: FrameSerializer, **kwargs):
super().__init__(**kwargs)
self._serializer = serializer
self._data_queue = asyncio.Queue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, (CancelFrame, EndFrame)):
await self._data_queue.put(None)
else:
data = self._serializer.serialize(frame)
if data:
await self._data_queue.put(data)
async def generator(self) -> AsyncGenerator[Any, None]:
running = True
while running:
data = await self._data_queue.get()
running = data is not None
if data:
yield data

View File

@@ -11,7 +11,6 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameFilter(FrameProcessor):
def __init__(self, types: List[type]):
super().__init__()
self._types = types
@@ -25,9 +24,11 @@ class FrameFilter(FrameProcessor):
if isinstance(frame, t):
return True
return (isinstance(frame, AppFrame)
or isinstance(frame, ControlFrame)
or isinstance(frame, SystemFrame))
return (
isinstance(frame, AppFrame)
or isinstance(frame, ControlFrame)
or isinstance(frame, SystemFrame)
)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

Some files were not shown because too many files have changed in this diff Show More