Compare commits

...

127 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
9cd7c82e77 testing pushing a frame from function call start hook 2024-09-30 14:52:18 -07:00
Kwindla Hultman Kramer
43161c816e get rid of some debug log lines used during development 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
6644c06af1 throw error if the llm tries to call a function that's not registered 2024-09-30 14:48:44 -07:00
Kwindla Hultman Kramer
ed47212e07 handle openai multiple function calls 2024-09-30 14:48:40 -07:00
JeevanReddy
db9cb74364 openai can give multiple tool calls, current implementation assumes only one function call at a time. Fixed this to handle multiple function calls. 2024-09-30 14:47:31 -07:00
Aleix Conchillo Flaqué
f64902eb25 pipeline(task): since everything is async tasks should wait for EndFrame 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
e115a274d6 tests: fix langchanin tests 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
00239c2fd4 syncparallelpipeline: fix now that all frames are asynchronous 2024-09-30 14:08:11 -07:00
Aleix Conchillo Flaqué
c0f9ad19fe all frame processors are asynchrnous
In this commit we make all frame processors asynchronous, that is, they have an
internal queue and they push frames using a task from that queue.
2024-09-30 13:17:50 -07:00
Mark Backman
46ac76701e Merge pull request #517 from pipecat-ai/mb/update-settings-frame
Consolidate update frames classes into a single UpdateSettingsFrame class
2024-09-30 12:56:45 -04:00
Mark Backman
1f77863aef Code review feedback 2024-09-30 12:50:40 -04:00
Mark Backman
d7555609fd Add TTS update settings options 2024-09-30 12:50:40 -04:00
Mark Backman
7fe118ce63 Align use of language param across TTS services 2024-09-30 12:50:40 -04:00
Mark Backman
44a349386c Consolidate update frames classes into a single UpdateSettingsFrame class 2024-09-30 12:50:39 -04:00
Mark Backman
97cba92fa5 Merge pull request #516 from pipecat-ai/mb/google-tts
Add Google TTS
2024-09-30 12:25:16 -04:00
Aleix Conchillo Flaqué
d9b16d4f73 services: import cosmetics 2024-09-27 13:32:27 -07:00
Aleix Conchillo Flaqué
50b6580fbb livekit: add license notice 2024-09-27 13:28:33 -07:00
Mark Backman
e7548f9494 Code review feedback 2024-09-27 08:02:44 -04:00
Mark Backman
830d2df671 Add Google TTS 2024-09-27 07:36:20 -04:00
Aleix Conchillo Flaqué
13b50a07db Merge pull request #515 from pipecat-ai/aleix/rtvi-frame-processors
RTVI frame processors
2024-09-27 00:48:09 -07:00
Aleix Conchillo Flaqué
4501dca133 Merge pull request #467 from joachimchauvet/main
Add LiveKit audio transport
2024-09-26 22:58:25 -07:00
Aleix Conchillo Flaqué
2c8e566507 rtvi: update version to 0.2 2024-09-26 22:42:36 -07:00
Aleix Conchillo Flaqué
6e8a202107 rtvi: fix handling transport messages 2024-09-26 22:42:19 -07:00
Aleix Conchillo Flaqué
2a05cd35b0 rtvi: add multiple RTVI frame processors 2024-09-26 22:42:08 -07:00
Mark Backman
55a70cde8f Merge pull request #514 from pipecat-ai/mb/aws-polly-tts
Add AWS Polly TTS support
2024-09-26 22:20:13 -04:00
Mark Backman
706c00d897 Code review feedback 2024-09-26 22:13:37 -04:00
Aleix Conchillo Flaqué
d323ea9e95 async_generator: keep pushing frames downstream 2024-09-26 16:44:49 -07:00
Aleix Conchillo Flaqué
b8ece84c6e services: super should be super() 2024-09-26 10:39:26 -07:00
Mark Backman
a018112a13 Merge pull request #510 from pipecat-ai/mb/deepgram-tts-http
Improve usability of Deepgram TTS: use Deepgram client, remove aiohttp
2024-09-26 13:38:42 -04:00
Mark Backman
d3a477902b Add changelog entry 2024-09-26 13:35:59 -04:00
Mark Backman
298b151486 Add setter methods 2024-09-26 13:35:59 -04:00
Mark Backman
6a6ea251ae Add AWS Polly TTS support 2024-09-26 13:35:59 -04:00
Aleix Conchillo Flaqué
c7c709a0a7 github: cache venv when running tests 2024-09-26 10:32:22 -07:00
Aleix Conchillo Flaqué
6ac57b4854 Merge pull request #494 from badbye/full-width-punctuations
add full-width punctuations as end of the sentence
2024-09-26 10:17:10 -07:00
Aleix Conchillo Flaqué
f5e0b946c7 services(cartesia): fix string formatting 2024-09-26 09:08:37 -07:00
Mark Backman
b1818cc370 Merge pull request #435 from golbin/main
Add speed and emotion options for Cartesia.
2024-09-26 07:14:59 -04:00
Jin Kim
d05717a1bd Apply Ruff formater 2024-09-26 19:52:25 +09:00
Aleix Conchillo Flaqué
d11daee31a Merge pull request #509 from pipecat-ai/aleix/frameprocessor-event-handlers
frame processor event handlers
2024-09-25 19:50:30 -07:00
Mark Backman
73da8c1910 Improve usability of Deepgram TTS: use Deepgram client, remove aiohttp 2024-09-25 22:43:10 -04:00
Aleix Conchillo Flaqué
f06aa300d0 rtvi: add on_bot_ready event 2024-09-25 16:52:18 -07:00
Aleix Conchillo Flaqué
c4e94e280e processors: add support for event handlers 2024-09-25 16:35:33 -07:00
Kwindla Hultman Kramer
8f2941c575 Merge pull request #492 from pipecat-ai/khk/flush-more-audio
add calls to flush_audio for say() and rtvi action
2024-09-25 12:35:50 -07:00
joachimchauvet
447baad5c3 update send_metrics() to support changes introduced in #474 2024-09-25 21:38:55 +03:00
Mark Backman
2703813e8a Merge pull request #496 from pipecat-ai/mb/azure-tts-inputs
Add Azure TTS input params
2024-09-25 14:38:01 -04:00
Mark Backman
521e152150 Merge pull request #495 from pipecat-ai/mb/elevenlabs-input-lang
Add language_code support for ElevenLabs TTS
2024-09-25 14:37:44 -04:00
Kwindla Hultman Kramer
3d43ad0f4d actually save the file 2024-09-25 10:59:00 -07:00
Kwindla Hultman Kramer
3621fceae2 fixes as noted by aleix 2024-09-25 09:19:58 -07:00
Aleix Conchillo Flaqué
e123f33c03 Merge pull request #506 from pipecat-ai/aleix/async-generator-processor
processors: add AsyncGeneratorProcessor
2024-09-25 00:04:09 -07:00
Aleix Conchillo Flaqué
b8713666c2 processors: add AsyncGeneratorProcessor 2024-09-25 00:01:04 -07:00
Aleix Conchillo Flaqué
cf0ab85e2c Merge pull request #505 from pipecat-ai/aleix/init-task-variables
initialize task variables and add minor description
2024-09-24 23:59:38 -07:00
Aleix Conchillo Flaqué
8502c7c801 Merge pull request #504 from pipecat-ai/aleix/rtvi-handle-frame
rtvi: add RTVIProcessor.handle_message()
2024-09-24 23:59:26 -07:00
Aleix Conchillo Flaqué
e89814dc6b Merge pull request #503 from pipecat-ai/aleix/end-cancel-task-frames
frames: add EndTaskFrame and CancelTaskFrame
2024-09-24 23:59:10 -07:00
Aleix Conchillo Flaqué
9461bacf0d pyproject: update fastapi to 0.115.0 2024-09-24 19:24:37 -07:00
Aleix Conchillo Flaqué
e276dcbab7 initialize task variables and add minor description 2024-09-24 19:19:00 -07:00
Aleix Conchillo Flaqué
1a3de0e819 rtvi: add RTVIProcessor.handle_message() 2024-09-24 19:12:06 -07:00
Aleix Conchillo Flaqué
ee3786fe15 frames: add EndTaskFrame and CancelTaskFrame 2024-09-24 19:10:22 -07:00
Aleix Conchillo Flaqué
31b5667cee frames: log text with [] so we can distinguish spaces better 2024-09-24 13:10:40 -07:00
Aleix Conchillo Flaqué
a483f1a083 rtvi: handle all actions from the action task 2024-09-24 10:48:15 -07:00
Aleix Conchillo Flaqué
2ecec1c9f8 Merge pull request #500 from pipecat-ai/aleix/rtvi-action-frames-task
RTVI action frames task
2024-09-24 10:13:43 -07:00
Aleix Conchillo Flaqué
08ac311971 rtvi: use task to process incoming action frames 2024-09-24 09:36:53 -07:00
Aleix Conchillo Flaqué
cb49b6a0d6 rtvi: add llm-text and tts-text server messages 2024-09-24 09:36:43 -07:00
Aleix Conchillo Flaqué
016da177db Merge pull request #499 from mercuryyy/main
Fix syntax error in deepgram.py
2024-09-24 09:10:05 -07:00
joachimchauvet
ec5998bc36 remove _internal_push_frame from LiveKitInputTransport 2024-09-24 14:54:37 +03:00
mercuryyy
b1e17ee347 Fix syntax error in deepgram.py 2024-09-24 07:45:29 -04:00
joachimchauvet
b6e1d6e6ae format with ruff 2024-09-24 10:21:02 +03:00
joachimchauvet
fa609f1afc adjust output sample rate and create user token 2024-09-24 10:16:54 +03:00
joachimchauvet
470b5eafe7 move tenacity imports inside try block 2024-09-24 10:16:54 +03:00
joachimchauvet
2e5b0c1d6b add tenacity dependency 2024-09-24 10:16:54 +03:00
joachimchauvet
a9390d96a1 add LiveKit audio transport 2024-09-24 10:16:54 +03:00
Mark Backman
8ee9621d66 Add setter functions 2024-09-23 21:12:01 -04:00
Jin Kim
49f2123893 Apply and Fix upstream changes for Cartesia 2024-09-24 07:59:26 +09:00
Jin Kim
cf72129852 Merge remote-tracking branch 'upstream/main' 2024-09-24 07:18:22 +09:00
Mark Backman
8edee8155d Add input params to Azure TTS 2024-09-23 17:52:23 -04:00
chadbailey59
c262b272fa Added RTVIActionFrame (#464)
* added RTVIActionFrame

* server-sent events

* reverted log changes

* fixup
2024-09-23 14:51:17 -05:00
Aleix Conchillo Flaqué
9ef9c1c58a Merge pull request #497 from pipecat-ai/aleix/ruff-formater
introduce Ruff formatting
2024-09-23 10:42:54 -07:00
Aleix Conchillo Flaqué
c7ff79a652 processors: fix formatting string 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
da81df5284 github: install dev-requirements when running tests 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
a4420dc88b README: add vscode and emacs ruff instructions 2024-09-23 09:53:37 -07:00
Aleix Conchillo Flaqué
eeb8338dce introduce Ruff formatting 2024-09-23 09:53:37 -07:00
Cyril S.
dfa4ac81fd Implement Sentry instrumentation for performance and error tracking (#470)
* feat: Add Sentry support in FrameProcessor

This update add optional Sentry integration for performance tracking and error monitoring.

Key changes include:

- Add conditional Sentry import and initialization check
- Implement Sentry spans in FrameProcessorMetrics to measure TTFB (Time To First Byte) and processing time when Sentry is available
- Maintain existing metrics functionality with MetricsFrame regardless of Sentry availability

* feat: Enable metrics in DeepgramSTTService for Sentry

This commit enhances the DeepgramSTTService class to enable metrics generation for use with Sentry.

Key changes include:

1. Enable general metrics generation:
   - Implement `can_generate_metrics` method, returning True when VAD is enabled
   - This allows metrics to be collected and used by both Sentry and the metrics system in frame_processor.py

2. Integrate Sentry-compatible performance tracking:
   - Add start_ttfb_metrics and start_processing_metrics calls in the VAD speech detection handler
   - Implement stop_ttfb_metrics call when receiving transcripts
   - Add stop_processing_metrics for final transcripts

3. Enhance VAD support for metrics:
   - Add `vad_enabled` property to check VAD event availability
   - Implement VAD-based speech detection handler for precise metric timing

These changes enable detailed performance tracking via both Sentry and the general metrics system when VAD is active. This allows for better monitoring and analysis of the speech-to-text process, providing valuable insights through Sentry and any other metrics consumers in the pipeline.

* Update frame_processor.py

* Refactor to support flexible metrics implementation

- Modified the __init__ method to accept a metrics parameter that is either FrameProcessorMetrics or one of its subclasses
- Updated the metrics initialization to create an instance with the processor's name
- Moved all FrameProcessorMetrics-related logic to a new processors\metrics\base.py file

* Implement flexible metrics system with Sentry integration

1. Created a new metrics module in processors/metrics/

2. Implemented FrameProcessorMetrics base class in base.py:

3. Implemented SentryMetrics class in sentry.py:
   - Inherits from FrameProcessorMetrics
   - Integrates with Sentry SDK for advanced metrics tracking
   - Implements Sentry-specific span creation and management for TTFB and processing metrics
   - Handles cases where Sentry is not available or initialized
2024-09-23 08:44:14 -07:00
Lewis Wolfgang
ea16dca8aa Merge pull request #469 from pipecat-ai/lewis/remove_torch_dependency
Remove torch dependency for using silero_vad
2024-09-23 09:59:40 -04:00
Mark Backman
306632b29a Add language_code support for ElevenLabs TTS 2024-09-23 09:01:02 -04:00
duyalei
4533ed014f add full-width punctuations as end of the sentence 2024-09-23 16:35:00 +08:00
Jin Kim
68cc4186ad Merge remote-tracking branch 'upstream/main' 2024-09-23 16:34:31 +09:00
Mark Backman
9a4e749c7c Merge pull request #491 from pipecat-ai/mb/elevenlabs-inputs
Add voice_settings and optimize_streaming_latency to ElevenLabs
2024-09-22 21:54:21 -04:00
Mark Backman
55c645c614 Add voice_settings and optimize_streaming_latency to ElevenLabs 2024-09-22 13:58:50 -04:00
Mark Backman
a1024bb365 Merge pull request #490 from pipecat-ai/mb/llm-rtvi-service-option
Add control frames for LLM param updates
2024-09-21 20:10:17 -04:00
Mark Backman
dfc82c3ba4 Merge pull request #486 from pipecat-ai/mb/llm-extra-params
Add extra input param to LLMs
2024-09-21 18:25:47 -04:00
Mark Backman
9e27a8aad0 Add control frames for LLM param updates 2024-09-21 00:02:58 -04:00
Mark Backman
c73111afea Add extra input param to LLMs 2024-09-21 00:01:25 -04:00
Kwindla Hultman Kramer
26a64afd8d Merge pull request #485 from pipecat-ai/khk/metrics-model-exclude-none
fixup for serialization issue
2024-09-20 18:24:19 -07:00
Kwindla Hultman Kramer
78a3f081de fixup for serialization issue 2024-09-20 18:21:06 -07:00
Mark Backman
e8f8a49646 Merge pull request #484 from pipecat-ai/mb/llm-input-params
Add input params for OpenAI, Anthropic, Together AI LLMs
2024-09-20 20:35:49 -04:00
Mark Backman
219304c5ee Added Changelog entries 2024-09-20 20:31:42 -04:00
Mark Backman
f3fd312b83 Add Together AI interruptible example 2024-09-20 20:21:19 -04:00
Mark Backman
357e66d64d Input params for Together AI LLM 2024-09-20 20:21:19 -04:00
Mark Backman
4fa1ea8c4b Input params for Anthropic LLM 2024-09-20 20:21:19 -04:00
Mark Backman
3b81cd462d Input params to OpenAI LLM 2024-09-20 20:21:19 -04:00
Aleix Conchillo Flaqué
14acf05a26 Merge pull request #480 from pipecat-ai/aleix/input-output-frames
introduce input/output audio and image frames
2024-09-20 14:44:37 -07:00
Mattie Ruth
58d9c84bc9 Merge pull request #474 from pipecat-ai/ruthless/improve-metrics-types-2
Ruthless/improve metrics types 2
2024-09-20 09:47:24 -04:00
Aleix Conchillo Flaqué
7e39d9ad3d introduce input/output audio and image frames
We now distinguish between input and output audio and image frames. We introduce
`InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame` and
`OutputImageRawFrame` (and other subclasses of those). The input frames usually
come from an input transport and are meant to be processed inside the pipeline
to generate new frames. However, the input frames will not be sent through an
output transport. The output frames can also be processed by any frame processor
in the pipeline and they are allowed to be sent by the output transport.
2024-09-19 23:11:03 -07:00
mattie ruth backman
a4edb3dab1 Cleanup on aisle METRICS. Note: See below, this is a breaking change
1. Fleshed out MetricsFrames and broke it into a proper set of types
2. Add model_name as a property to the AIService so that it can be
   automatically included in metrics and also remove that
   overhead from all the various services themselves

Breaking change!

Because of the types improvements, the MetricsFrame type has
changed. Each frame will have a list of metrics simlilar to before
except each item in the list will only contain one type of metric:
"ttfb", "tokens", "characters", or "processing". Previously these
fields would be in every entry but set to None if they didn't apply.

While this changes internal handling of the MetricsFrame, it does NOT
break the RTVI/daily messaging of metrics. That format remains the same.

Also. Remember to use model_name for accessing a service's current
model and set_model_name for setting it.
2024-09-19 21:30:34 -04:00
Mattie Ruth
ed409d0460 Merge pull request #478 from pipecat-ai/ruthless/get-tests-running
Ruthless/get tests running
2024-09-19 21:01:27 -04:00
mattie ruth backman
50b45ac2da get the test infrastructure running again
disable broken tests for now
2024-09-19 20:58:17 -04:00
Kwindla Hultman Kramer
29bcbc68c5 Merge pull request #479 from pipecat-ai/khk/small-fixes
fix small issues that crept into main
2024-09-19 17:25:27 -07:00
Kwindla Hultman Kramer
affbe9ac7d fix small issues that crept into main 2024-09-19 17:17:33 -07:00
Aleix Conchillo Flaqué
1790fa452f Merge pull request #436 from pipecat-ai/aleix/frameprocessor-single-task
introduce synchronous and asynchronous frame processors
2024-09-19 11:22:56 -07:00
Aleix Conchillo Flaqué
607a246572 updated CHANGELOG with sync/async frame processors 2024-09-19 01:32:17 -07:00
Aleix Conchillo Flaqué
4f1b06e6b2 pipeline: renamed ParallelTask to SyncParallelPipeline 2024-09-19 01:32:17 -07:00
Aleix Conchillo Flaqué
62e9a33a70 examples: use CartesiaHttpTTSService to synchronize frames 2024-09-19 01:32:17 -07:00
Aleix Conchillo Flaqué
3298f935ef services(fal,moondream): add missing **kwargs 2024-09-19 01:32:17 -07:00
Aleix Conchillo Flaqué
0e8f56c752 services: move TTSService push_stop_frames to AsyncTTSService 2024-09-19 01:32:15 -07:00
Aleix Conchillo Flaqué
8224538372 services(cartesia): added CartesiaHttpTTSService 2024-09-19 01:31:12 -07:00
Aleix Conchillo Flaqué
fbf6eef68f transports(base_output): wait for sink tasks before canceling audio/video tasks 2024-09-19 01:31:12 -07:00
Aleix Conchillo Flaqué
f078d156de frames: StartFrame is now a SystemFrame 2024-09-19 01:31:12 -07:00
Aleix Conchillo Flaqué
23d6eed5ea transports: input()/output() return subclass instead of base class 2024-09-19 01:31:12 -07:00
Aleix Conchillo Flaqué
0ed3d118d6 services(moondream); update revision to 2024-08-26 2024-09-19 01:31:12 -07:00
Aleix Conchillo Flaqué
337f048864 introduce synchronous and asynchronous frame processors
Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements travelling across the pipeline are
called frames.

To have a deterministic behavior the frames travelling through the pipeline
should always be ordered, except system frames which are out-of-band frames. To
achieve that, each frame processor should only output frames from a single task.

There are synchronous and asynchronous frame processors. The synchronous
processors push output frames from the same task that they receive input frames,
and therefore only pushing frames from one task. Asynchrnous frame processors
can have internal tasks to perform things asynchrnously (e.g. receiving data
from a websocket) but they also have a single task where they push frames from.
2024-09-19 01:31:10 -07:00
Mark Backman
6f3c421621 Merge pull request #475 from pipecat-ai/mb/tts-sample-rate
Add sample_rate setting to TTS services
2024-09-18 14:59:09 -04:00
Mark Backman
eadd68d40b Add sample_rate setting to TTS services 2024-09-18 14:50:20 -04:00
Lewis Wolfgang
71202e3cd5 Remove torch dependency for using silero_vad 2024-09-17 16:48:52 -04:00
Jin Kim
75008d8f11 Add speed and emotion setting method to Cartesia TTS service 2024-09-18 00:51:45 +09:00
Jin Kim
2da0ecbe3c Revert "model_id" as a main argument 2024-09-18 00:38:12 +09:00
Jin Kim
c7f814b2dc Merge remote-tracking branch 'upstream/main' 2024-09-18 00:33:29 +09:00
Aleix Conchillo Flaqué
13a4a05388 Merge pull request #466 from pipecat-ai/aleix/elevenlabs-cartesia-close-websocket-first
services(cartesia,elevenlabs): close websocket before the receiving task
2024-09-16 23:55:28 -07:00
Aleix Conchillo Flaqué
20c019ae16 services(cartesia,elevenlabs): close websocket before the receiving task 2024-09-16 23:54:21 -07:00
Jin Kim
fa0deededa Add voice options and make to use InputParams for Cartesia. 2024-09-09 10:53:23 +09:00
182 changed files with 6779 additions and 3741 deletions

View File

@@ -1,4 +1,4 @@
name: lint
name: format
on:
workflow_dispatch:
@@ -12,12 +12,12 @@ on:
- "docs/**"
concurrency:
group: build-lint-${{ github.event.pull_request.number || github.ref }}
group: build-format-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
autopep8:
name: "Formatting lints"
ruff-format:
name: "Formatting checker"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
@@ -25,7 +25,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: "3.10"
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -34,11 +34,8 @@ jobs:
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt
- name: autopep8
id: autopep8
- name: Ruff formatter
id: ruff
run: |
source .venv/bin/activate
autopep8 --max-line-length 100 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1
ruff format --config line-length=100 --diff --exclude "*_pb2.py"

View File

@@ -20,14 +20,24 @@ jobs:
name: "Unit and Integration Tests"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python
id: setup_python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Cache virtual environment
uses: actions/cache@v3
with:
# We are hashing dev-requirements.txt and test-requirements.txt which
# contain all dependencies needed to run the tests.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
path: .venv
- name: Install system packages
run: sudo apt-get install -y portaudio19-dev
id: install_system_packages
run: |
sudo apt-get install -y portaudio19-dev
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -35,8 +45,8 @@ jobs:
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt
pip install -r dev-requirements.txt -r test-requirements.txt
- name: Test with pytest
run: |
source .venv/bin/activate
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests
pytest --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests

View File

@@ -9,9 +9,49 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added Google TTS service and corresponding foundational example `07n-interruptible-google.py`
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
- Added InputParams to Azure TTS service.
- All `FrameProcessors` can now register event handlers.
```
tts = SomeTTSService(...)
@tts.event_handler("on_connected"):
async def on_connected(processor):
...
```
- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
- Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed)
for OpenAI, Anthropic, and Together AI services along with corresponding
setter functions.
- Added `sample_rate` as a constructor parameter for TTS services.
- Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements traveling across the pipeline
are called frames.
To have a deterministic behavior the frames traveling through the pipeline
should always be ordered, except system frames which are out-of-band
frames. To achieve that, each frame processor should only output frames from a
single task.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
@@ -19,6 +59,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added `CartesiaHttpTTSService`.
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
@@ -40,6 +86,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated individual update settings frame classes into a single UpdateSettingsFrame
class for STT, LLM, and TTS.
- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
usually come from an input transport and are meant to be processed inside the
pipeline to generate new frames. However, the input frames will not be sent
through an output transport. The output frames can also be processed by any
frame processor in the pipeline and they are allowed to be sent by the output
transport.
- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
- `StartFrame` is back a system frame so we make sure it's processed immediately
by all processors. `EndFrame` stays a control frame since it needs to be
ordered allowing the frames in the pipeline to be processed.
- Updated `MoondreamService` revision to `2024-08-26`.
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
@@ -60,6 +133,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering
tasks (after receiving and `EndFrame`) before the internal queue was emptied,
causing the pipeline to finish prematurely.
- `StartFrame` should be the first frame every processor receives to avoid
situations where things are not initialized (because initialization happens on
`StartFrame`) and other frames come in resulting in undesired behavior.
@@ -293,7 +370,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
`DeepgramSTTService` need to process things asynchronously. For example, audio
is sent to Deepgram but transcriptions are not returned immediately. In these
cases we still require all frames (except system frames) to be pushed
@@ -310,7 +387,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `WhisperSTTService` model can now also be a string.
- Added missing * keyword separators in services.
- Added missing \* keyword separators in services.
### Fixed
@@ -387,7 +464,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added new `TwilioFrameSerializer`. This is a new serializer that knows how to
serialize and deserialize audio frames from Twilio.
- Added Daily transport event: `on_dialout_answered`. See
- Added Daily transport event: `on_dialout_answered`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
- Added new `AzureSTTService`. This allows you to use Azure Speech-To-Text.
@@ -627,7 +704,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added Daily transport support for dial-in use cases.
- Added Daily transport events: `on_dialout_connected`, `on_dialout_stopped`,
`on_dialout_error` and `on_dialout_warning`. See
`on_dialout_error` and `on_dialout_warning`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
## [0.0.21] - 2024-05-22

View File

@@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples
@@ -110,7 +110,6 @@ python app.py
Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
WebSockets are fine for server-to-server communication or for initial development. But for production use, youll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
@@ -131,7 +130,6 @@ pip install pipecat-ai[silero]
The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console.
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -165,27 +163,29 @@ pip install "path_to_this_repo[option,...]"
From the root directory, run:
```shell
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests
pytest --doctest-modules --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
```
## Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting.
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [py-autopep8](https://codeberg.org/ideasman42/emacs-py-autopep8) package and configure `autopep8` arguments:
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
```elisp
(use-package py-autopep8
(use-package lazy-ruff
:ensure t
:defer t
:hook ((python-mode . py-autopep8-mode))
:hook ((python-mode . lazy-ruff-mode))
:config
(setq py-autopep8-options '("-a" "-a", "--max-line-length=100")))
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
```
`autopep8` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
```elisp
(use-package pyvenv-auto
@@ -198,18 +198,14 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [p
### Visual Studio Code
Install the
[autopep8](https://marketplace.visualstudio.com/items?itemName=ms-python.autopep8) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `autopep8` arguments:
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
```json
"[python]": {
"editor.defaultFormatter": "ms-python.autopep8",
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true
},
"autopep8.args": [
"-a",
"-a",
"--max-line-length=100"
],
"ruff.format.args": ["--config", "line-length=100"]
```
## Getting help

View File

@@ -1,8 +1,8 @@
autopep8~=2.3.1
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.376
pytest~=8.3.2
ruff~=0.6.7
setuptools~=72.2.0
setuptools_scm~=8.1.0

View File

@@ -1,6 +1,11 @@
# Anthropic
ANTHROPIC_API_KEY=...
# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...
# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...

View File

@@ -6,7 +6,10 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -16,6 +19,7 @@ from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -39,7 +43,7 @@ async def main(room_url: str, token: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -47,9 +51,7 @@ async def main(room_url: str, token: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -61,14 +63,16 @@ async def main(room_url: str, token: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -16,9 +16,14 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -26,37 +31,37 @@ load_dotenv(override=True)
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -64,7 +69,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
allow_headers=["*"],
)
# ----------------- Main ----------------- #
@@ -73,13 +78,15 @@ app.add_middleware(
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
@@ -88,31 +95,28 @@ async def spawn_fly_machine(room_url: str, token: str):
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
vm_id = data["id"]
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
@@ -134,29 +138,23 @@ async def start_bot(request: Request) -> JSONResponse:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
@@ -167,24 +165,26 @@ async def start_bot(request: Request) -> JSONResponse:
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
if __name__ == "__main__":
# Check environment variables
@@ -193,23 +193,19 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload code on change"
)
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -6,11 +6,11 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyDialinSettings
@@ -18,6 +18,7 @@ from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -31,10 +32,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
)
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
@@ -50,7 +48,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -58,10 +56,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -73,14 +68,16 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -7,7 +7,6 @@ provisioning a room and starting a Pipecat bot in response.
Refer to README for more information.
"""
import aiohttp
import os
import argparse
@@ -25,17 +24,18 @@ from pipecat.transports.services.helpers.daily_rest import (
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams)
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']
REQUIRED_ENV_VARS = ["OPENAI_API_KEY", "DAILY_API_KEY", "ELEVENLABS_API_KEY", "ELEVENLABS_VOICE_ID"]
daily_helpers = {}
@@ -47,12 +47,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -60,7 +61,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
allow_headers=["*"],
)
"""
@@ -80,10 +81,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
properties=DailyRoomProperties(
# Note: these are the default values, except for the display name
sip=DailyRoomSipParams(
display_name="dialin-user",
video=False,
sip_mode="dial-in",
num_endpoints=1
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
)
)
)
@@ -97,8 +95,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
@@ -106,8 +103,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get room or token token")
raise HTTPException(status_code=500, detail=f"Failed to get room or token token")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
@@ -120,14 +116,10 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
try:
subprocess.Popen(
[bot_proc],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return room
@@ -150,11 +142,10 @@ async def twilio_start_bot(request: Request):
pass
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
callId = data.get('CallSid')
callId = data.get("CallSid")
if not callId:
raise HTTPException(
status_code=500, detail="Missing 'CallSid' in request")
raise HTTPException(status_code=500, detail="Missing 'CallSid' in request")
print("CallId: %s" % callId)
@@ -170,7 +161,8 @@ async def twilio_start_bot(request: Request):
# http://com.twilio.music.classical.s3.amazonaws.com/BusyStrings.mp3
resp = VoiceResponse()
resp.play(
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10)
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10
)
return str(resp)
@@ -192,18 +184,14 @@ async def daily_start_bot(request: Request) -> JSONResponse:
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
except Exception:
raise HTTPException(
status_code=500,
detail="Missing properties 'callId' or 'callDomain'")
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({
"room_url": room.url,
"sipUri": room.config.sip_endpoint
})
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
# ----------------- Main ----------------- #
@@ -215,24 +203,18 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -6,11 +6,11 @@ import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -21,14 +21,15 @@ from twilio.rest import Client
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID')
twilio_auth_token = os.getenv('TWILIO_AUTH_TOKEN')
twilio_account_sid = os.getenv("TWILIO_ACCOUNT_SID")
twilio_auth_token = os.getenv("TWILIO_AUTH_TOKEN")
twilioclient = Client(twilio_account_sid, twilio_auth_token)
daily_api_key = os.getenv("DAILY_API_KEY", "")
@@ -51,7 +52,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
),
)
tts = ElevenLabsTTSService(
@@ -59,10 +60,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -74,14 +72,16 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -103,7 +103,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
twiml=f"<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>"
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")

View File

@@ -1,4 +1,4 @@
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,elevenlabs,openai,silero]
fastapi
uvicorn
python-dotenv

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -32,9 +33,10 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
tts = CartesiaTTSService(
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -47,10 +49,11 @@ async def main():
# participant joins.
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
participant_name = participant["info"]["userName"] or ""
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,6 +20,7 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)

View File

@@ -0,0 +1,108 @@
import argparse
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from livekit import api # pip install livekit-api
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
async def configure_livekit():
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token(room_name, "Say One Thing", api_key, api_secret)
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name)
async def main():
async with aiohttp.ClientSession() as session:
(url, token, room_name) = await configure_livekit()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TextFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -22,6 +22,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,25 +34,22 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Say One Thing From an LLM",
DailyParams(audio_out_enabled=True))
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)
tts = CartesiaTTSService(
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}]
}
]
runner = PipelineRunner()
@@ -59,7 +57,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await runner.run(task)

View File

@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -35,17 +36,11 @@ async def main():
room_url,
None,
"Show a still frame image",
DailyParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
DailyParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)

View File

@@ -22,6 +22,7 @@ from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -35,15 +36,11 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)

View File

@@ -4,6 +4,10 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
#
# This example broken on latest pipecat and needs updating.
#
import aiohttp
import asyncio
import os
@@ -24,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -54,8 +59,7 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [{"role": "system",
"content": "tell the user a joke about llamas"}]
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM
# output to audio frames. This task will run in parallel with generating
@@ -73,8 +77,7 @@ async def main():
]
)
merge_pipeline = SequentialMergePipeline(
[simple_tts_pipeline, llm_pipeline])
merge_pipeline = SequentialMergePipeline([simple_tts_pipeline, llm_pipeline])
await asyncio.gather(
transport.run(merge_pipeline),

View File

@@ -14,21 +14,18 @@ from dataclasses import dataclass
from pipecat.frames.frames import (
AppFrame,
Frame,
ImageRawFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -37,6 +34,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -84,47 +82,46 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
camera_out_height=1024,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)
sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
),
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])
# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen], # Generate image
),
transport.output(), # Transport output
]
)
frames = []
for month in [

View File

@@ -11,18 +11,25 @@ import sys
import tkinter as tk
from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.frames.frames import (
Frame,
OutputAudioRawFrame,
TTSAudioRawFrame,
URLImageRawFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
from loguru import logger
@@ -42,7 +49,12 @@ async def main():
runner = PipelineRunner()
async def get_month_data(month):
messages = [{"role": "system", "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", }]
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
class ImageDescription(FrameProcessor):
def __init__(self):
@@ -60,14 +72,16 @@ async def main():
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = AudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
class ImageGrabber(FrameProcessor):
def __init__(self):
@@ -80,22 +94,20 @@ async def main():
if isinstance(frame, URLImageRawFrame):
self.frame = frame
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="square_hd"
),
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"))
key=os.getenv("FAL_KEY"),
)
aggregator = LLMFullResponseAggregator()
sentence_aggregator = SentenceAggregator()
description = ImageDescription()
@@ -103,13 +115,27 @@ async def main():
image_grabber = ImageGrabber()
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
])
# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
@@ -130,7 +156,9 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
camera_out_height=1024,
),
)
pipeline = Pipeline([transport.output()])

View File

@@ -10,6 +10,12 @@ import os
import sys
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -28,6 +34,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -37,8 +44,20 @@ logger.add(sys.stderr, level="DEBUG")
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
@@ -54,8 +73,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -63,10 +82,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ml = MetricsLogger()
@@ -79,29 +95,25 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
ml,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
ml,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=False,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -11,7 +11,7 @@ import sys
from PIL import Image
from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame
from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -20,8 +20,8 @@ from pipecat.processors.aggregators.llm_response import (
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
@@ -31,6 +31,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -52,9 +53,21 @@ class ImageSyncAggregator(FrameProcessor):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format))
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format,
)
)
await self.push_frame(frame)
await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format))
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format,
)
)
else:
await self.push_frame(frame)
@@ -75,17 +88,15 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -102,21 +113,23 @@ async def main():
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline([
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out
])
pipeline = Pipeline(
[
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
participant_name = participant["info"]["userName"] or ""
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -52,9 +55,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -66,28 +67,32 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -53,8 +56,8 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-opus-20240229")
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229"
)
# todo: think more about how to handle system prompts in a more general way. OpenAI,
# Google, and Anthropic all have slightly different approaches to providing a system
@@ -69,14 +72,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -15,7 +15,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -32,6 +34,7 @@ from loguru import logger
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -70,19 +73,22 @@ async def main():
prompt = ChatPromptTemplate.from_messages(
[
("system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
(
"system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
]
)
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input")
input_messages_key="input",
)
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
@@ -90,12 +96,12 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
@@ -109,11 +115,7 @@ async def main():
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [(
{
"content": "Please briefly introduce yourself to the user."
}
)]
messages = [({"content": "Please briefly introduce yourself to the user."})]
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,26 +5,27 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,21 +44,15 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,15 +64,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -85,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,26 +5,27 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +44,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
@@ -52,9 +53,7 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -66,28 +65,32 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.playht import PlayHTTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +47,8 @@ async def main():
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = PlayHTTTSService(
@@ -54,9 +57,7 @@ async def main():
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -68,14 +69,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -83,8 +86,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.azure import AzureLLMService, AzureSTTService, AzureTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +48,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
),
)
stt = AzureSTTService(
@@ -74,15 +77,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -90,8 +95,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.openai import OpenAITTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,18 +47,13 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="alloy")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,14 +65,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -82,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -28,6 +28,7 @@ from loguru import logger
import time
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -46,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -60,9 +61,7 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
model="gpt-4o",
tags={
"conversation_id": f"pipecat-{timestamp}"
}
tags={"conversation_id": f"pipecat-{timestamp}"},
)
messages = [
@@ -74,14 +73,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@@ -89,8 +90,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,19 +48,17 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
),
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000"
base_url="http://localhost:8000",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,14 +70,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -84,8 +87,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +48,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
),
)
stt = GladiaSTTService(
@@ -57,9 +60,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -71,15 +72,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -87,8 +90,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.lmnt import LmntTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,18 +47,13 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = LmntTTSService(
api_key=os.getenv("LMNT_API_KEY"),
voice_id="morgan"
)
tts = LmntTTSService(api_key=os.getenv("LMNT_API_KEY"), voice_id="morgan")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,14 +65,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -82,8 +82,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -0,0 +1,107 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model=os.getenv("TOGETHER_MODEL"),
params=TogetherLLMService.InputParams(
temperature=1.0,
top_p=0.9,
top_k=40,
extra={
"frequency_penalty": 2.0,
"presence_penalty": 0.0,
},
),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,102 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.aws import AWSTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AWSTTSService(
api_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="Amy",
params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,100 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
credentials=os.getenv("GOOGLE_CREDENTIALS"),
voice_id="en-US-Neural2-J",
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -3,18 +3,19 @@ import aiohttp
import asyncio
import logging
import os
from pipecat.pipeline.aggregators import SentenceAggregator
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.daily_transport import DailyTransport
from pipecat.services.azure_ai_services import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.services.fal_ai_services import FalImageGenService
from pipecat.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.transports.services.daily import DailyTransport
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
@@ -53,9 +54,7 @@ async def main():
voice_id="jBpfuIE2acCO8z3wKNLl",
)
dalle = FalImageGenService(
params=FalImageGenService.InputParams(
image_size="1024x1024"
),
params=FalImageGenService.InputParams(image_size="1024x1024"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -75,13 +74,11 @@ async def main():
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
"""This function streams text from the LLM and uses the TTS service to convert
that text to speech as it's received. """
that text to speech as it's received."""
source_queue = asyncio.Queue()
sink_queue = asyncio.Queue()
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline(
[llm, sentence_aggregator, tts1], source_queue, sink_queue
)
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(EndFrame())

View File

@@ -8,9 +8,17 @@ import aiohttp
import asyncio
import sys
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransport, DailyParams
from runner import configure
@@ -18,33 +26,56 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url, token, "Test",
room_url,
token,
"Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)
camera_out_height=720,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), transport.output()])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])
runner = PipelineRunner()

View File

@@ -10,9 +10,17 @@ import sys
import tkinter as tk
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -22,12 +30,33 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -36,8 +65,8 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(
audio_in_enabled=True))
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
tk_transport = TkLocalTransport(
tk_root,
@@ -46,13 +75,15 @@ async def main():
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
camera_out_height=720,
),
)
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(pipeline)

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -25,6 +27,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -52,9 +55,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -67,15 +68,17 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -12,9 +12,9 @@ import wave
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
OutputAudioRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -25,7 +25,7 @@ from pipecat.processors.aggregators.llm_response import (
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
@@ -35,6 +35,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -53,12 +54,12 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
class OutboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -71,7 +72,6 @@ class OutboundSoundEffectWrapper(FrameProcessor):
class InboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -95,17 +95,15 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV",
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
messages = [
@@ -122,18 +120,20 @@ async def main():
fl = FrameLogger("LLM Out")
fl2 = FrameLogger("Transcription In")
pipeline = Pipeline([
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out,
]
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -86,15 +88,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -102,5 +106,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -62,8 +64,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -73,8 +75,8 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(
model="gemini-1.5-flash-latest",
api_key=os.getenv("GOOGLE_API_KEY"))
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -88,15 +90,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +108,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -71,10 +73,7 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
openai = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -88,15 +87,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +105,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +34,6 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,7 +45,9 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(frame, direction)
@@ -61,8 +63,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
user_response = UserResponseAggregator()
@@ -71,14 +73,14 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
anthropic = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY")
)
anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
)
@transport.event_handler("on_first_participant_joined")
@@ -88,15 +90,17 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output(),
]
)
task = PipelineTask(pipeline)
@@ -104,5 +108,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -21,6 +21,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -28,7 +29,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -40,8 +40,9 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = WhisperSTTService()

View File

@@ -19,6 +19,7 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -26,7 +27,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -22,6 +22,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,7 +30,6 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -41,8 +41,9 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

View File

@@ -5,10 +5,15 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -19,13 +24,6 @@ from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
await llm.push_frame(TextFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
await llm.push_frame(TextFrame("Let me check on that. "))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
@@ -52,8 +55,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -61,15 +64,10 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
None,
fetch_weather_from_api,
start_callback=start_fetch_weather)
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
@@ -89,17 +87,15 @@ async def main():
},
"format": {
"type": "string",
"enum": [
"celsius",
"fahrenheit"],
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": [
"location",
"format"],
"required": ["location", "format"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -110,16 +106,18 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
fl_in,
transport.input(),
context_aggregator.user(),
llm,
fl_out,
tts,
transport.output(),
context_aggregator.assistant(),
])
pipeline = Pipeline(
[
# fl_in,
transport.input(),
context_aggregator.user(),
llm,
# fl_out,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@@ -133,5 +131,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -28,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -39,7 +40,11 @@ current_voice = "News Lady"
async def switch_voice(function_name, tool_call_id, args, llm, context, result_callback):
global current_voice
current_voice = args["voice"]
await result_callback({"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."})
await result_callback(
{
"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."
}
)
async def news_lady_filter(frame) -> bool:
@@ -66,8 +71,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
news_lady = CartesiaTTSService(
@@ -85,9 +90,7 @@ async def main():
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_voice", switch_voice)
tools = [
@@ -106,7 +109,9 @@ async def main():
},
"required": ["voice"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -117,18 +122,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -139,7 +146,9 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}."})
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}.",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -29,6 +29,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -64,8 +65,8 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
vad_audio_passthrough=True,
),
)
stt = WhisperSTTService(model=Model.LARGE)
@@ -80,9 +81,7 @@ async def main():
voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [
@@ -101,7 +100,9 @@ async def main():
},
"required": ["language"],
},
})]
},
)
]
messages = [
{
"role": "system",
@@ -112,18 +113,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant() # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -134,7 +137,9 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."})
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}.",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -14,10 +14,16 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -25,6 +31,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,15 +50,15 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak"
base_url="http://0.0.0.0:8080/v1/speak",
)
llm = OpenAILLMService(
@@ -60,7 +67,7 @@ async def main():
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1"
base_url="http://0.0.0.0:8000/v1",
)
messages = [
@@ -73,14 +80,16 @@ async def main():
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -93,8 +102,7 @@ async def main():
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Handle "latency-ping" messages. The client will send app messages that look like
@@ -111,14 +119,18 @@ async def main():
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}},
participant_id=sender))
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
# And push to the pipeline for the Daily transport.output to send
await tma_in.push_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender))
participant_id=sender,
)
)
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")

View File

@@ -14,7 +14,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
@@ -26,6 +28,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -53,9 +56,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -69,33 +70,41 @@ async def main():
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -20,6 +20,7 @@ from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,12 +30,7 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-i",
"--input",
type=str,
required=True,
help="Input video file")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
(room_url, _, args) = await configure_with_args(session, parser)
@@ -49,7 +45,7 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
),
)
gst = GStreamerPipelineSource(
@@ -59,13 +55,15 @@ async def main():
video_height=720,
audio_sample_rate=16000,
audio_channels=1,
)
),
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)

View File

@@ -19,6 +19,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -38,20 +39,22 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
),
)
gst = GStreamerPipelineSource(
pipeline="videotestsrc ! capsfilter caps=\"video/x-raw,width=1280,height=720,framerate=30/1\"",
pipeline='videotestsrc ! capsfilter caps="video/x-raw,width=1280,height=720,framerate=30/1"',
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
clock_sync=False))
video_width=1280, video_height=720, clock_sync=False
),
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)

View File

@@ -23,6 +23,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -46,8 +47,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -56,8 +57,7 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620"
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
llm.register_function("get_weather", get_weather)
@@ -90,18 +90,20 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.

View File

@@ -23,6 +23,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -55,8 +56,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -67,7 +68,7 @@ async def main():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True
enable_prompt_caching_beta=True,
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
@@ -100,7 +101,7 @@ async def main():
},
"required": ["question"],
},
}
},
]
# todo: test with very short initial user message
@@ -134,28 +135,28 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
"type": "text",
"text": system_prompt,
}
]
],
},
{
"role": "user",
"content": "Start the conversation by introducing yourself."
}]
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]

View File

@@ -25,6 +25,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -32,12 +33,8 @@ logger.add(sys.stderr, level="DEBUG")
async def get_current_weather(
function_name,
tool_call_id,
arguments,
llm,
context,
result_callback):
function_name, tool_call_id, arguments, llm, context, result_callback
):
logger.debug("IN get_current_weather")
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -55,8 +52,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -104,26 +101,28 @@ Reminder:
"""
messages = [{"role": "system",
"content": system_prompt},
{"role": "user",
"content": "Wait for the user to say something."}]
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Wait for the user to say something."},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.

View File

@@ -17,16 +17,13 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -42,15 +39,19 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -13,10 +13,11 @@ from PIL import Image
from pipecat.frames.frames import (
ImageRawFrame,
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
AudioRawFrame,
TTSAudioRawFrame,
TTSStoppedFrame,
TextFrame,
UserImageRawFrame,
@@ -42,6 +43,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -59,7 +61,7 @@ for i in range(1, 26):
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
flipped = sprites[::-1]
sprites.extend(flipped)
@@ -82,7 +84,7 @@ class TalkingAnimation(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
@@ -105,7 +107,9 @@ class UserImageRequester(FrameProcessor):
if self.participant_id and isinstance(frame, TextFrame):
if frame.text == user_request_answer:
await self.push_frame(UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM)
await self.push_frame(
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(TextFrame("Describe the image in a short sentence."))
elif isinstance(frame, UserImageRawFrame):
await self.push_frame(frame)
@@ -149,8 +153,8 @@ async def main():
camera_out_height=576,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
@@ -158,9 +162,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ta = TalkingAnimation()
@@ -183,17 +185,17 @@ async def main():
ura = LLMUserResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
ura,
llm,
ParallelPipeline(
[sa, ir, va, moondream],
[tf, imgf]),
tts,
ta,
transport.output()
])
pipeline = Pipeline(
[
transport.input(),
ura,
llm,
ParallelPipeline([sa, ir, va, moondream], [tf, imgf]),
tts,
ta,
transport.output(),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(quiet_frame)

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,moondream,openai,silero]
pipecat-ai[daily,cartesia,moondream,openai,silero]

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -10,7 +10,7 @@ import os
import sys
import wave
from pipecat.frames.frames import AudioRawFrame
from pipecat.frames.frames import OutputAudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -26,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -49,40 +50,44 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
class IntakeProcessor:
def __init__(self, context: OpenAILLMContext):
print(f"Initializing context from IntakeProcessor")
context.add_message({"role": "system", "content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function."})
context.set_tools([
context.add_message(
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}},
"role": "system",
"content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
}
)
context.set_tools(
[
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}
},
},
},
},
}])
}
]
)
async def verify_birthday(
self,
function_name,
tool_call_id,
args,
llm,
context,
result_callback):
self, function_name, tool_call_id, args, llm, context, result_callback
):
if args["birthday"] == "1983-01-01":
context.set_tools(
[
@@ -109,18 +114,35 @@ class IntakeProcessor:
},
},
},
}},
}
},
},
},
}])
}
]
)
# It's a bit weird to push this to the LLM, but it gets it into the pipeline
# await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM)
# We don't need the function call in the context, so just return a new
# system message and let the framework re-prompt
await result_callback([{"role": "system", "content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages."}])
await result_callback(
[
{
"role": "system",
"content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
}
]
)
else:
# The user provided an incorrect birthday; ask them to try again
await result_callback([{"role": "system", "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function."}])
await result_callback(
[
{
"role": "system",
"content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
}
]
)
async def start_prescriptions(self, function_name, llm, context):
print(f"!!! doing start prescriptions")
@@ -143,16 +165,22 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "What the user is allergic to",
}},
}
},
},
}},
}
},
},
},
}])
}
]
)
context.add_message(
{
"role": "system",
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function."})
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
}
)
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
@@ -178,17 +206,22 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's medical condition",
}},
}
},
},
}},
}
},
},
},
},
])
]
)
context.add_message(
{
"role": "system",
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function."})
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
@@ -212,24 +245,31 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}},
}
},
},
}},
}
},
},
},
}])
}
]
)
context.add_message(
{
"role": "system",
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."})
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
# move to finish call
context.set_tools([])
context.add_message({"role": "system",
"content": "Now, thank the user and end the conversation."})
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
@@ -260,7 +300,7 @@ async def main():
# tier="nova",
# model="2-general"
# )
)
),
)
tts = CartesiaTTSService(
@@ -273,9 +313,7 @@ async def main():
# voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
# )
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = []
context = OpenAILLMContext(messages=messages)
@@ -284,33 +322,31 @@ async def main():
intake = IntakeProcessor(context)
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
"list_prescriptions",
intake.save_data,
start_callback=intake.start_prescriptions)
"list_prescriptions", intake.save_data, start_callback=intake.start_prescriptions
)
llm.register_function(
"list_allergies",
intake.save_data,
start_callback=intake.start_allergies)
"list_allergies", intake.save_data, start_callback=intake.start_allergies
)
llm.register_function(
"list_conditions",
intake.save_data,
start_callback=intake.start_conditions)
"list_conditions", intake.save_data, start_callback=intake.start_conditions
)
llm.register_function(
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons)
"list_visit_reasons", intake.save_data, start_callback=intake.start_visit_reasons
)
fl = FrameLogger("LLM Output")
pipeline = Pipeline([
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
])
pipeline = Pipeline(
[
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,cartesia,openai,silero]

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,19 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
print(f"to join a test room, visit http://localhost:{config.port}/start")

View File

@@ -14,14 +14,17 @@ from PIL import Image
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.frames.frames import (
AudioRawFrame,
ImageRawFrame,
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
TTSStoppedFrame
TTSAudioRawFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
@@ -34,6 +37,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -49,7 +53,7 @@ for i in range(1, 26):
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
flipped = sprites[::-1]
sprites.extend(flipped)
@@ -72,7 +76,7 @@ class TalkingAnimation(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
if isinstance(frame, TTSAudioRawFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
@@ -107,7 +111,7 @@ async def main():
# tier="nova",
# model="2-general"
# )
)
),
)
tts = ElevenLabsTTSService(
@@ -116,7 +120,6 @@ async def main():
# English
#
voice_id="pNInz6obpgDQGcFmaJgB",
#
# Spanish
#
@@ -124,9 +127,7 @@ async def main():
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -135,7 +136,6 @@ async def main():
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
#
# Spanish
#
@@ -148,15 +148,17 @@ async def main():
ta = TalkingAnimation()
pipeline = Pipeline([
transport.input(),
user_response,
llm,
tts,
ta,
transport.output(),
assistant_response,
])
pipeline = Pipeline(
[
transport.input(),
user_response,
llm,
tts,
ta,
transport.output(),
assistant_response,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
await task.queue_frame(quiet_frame)

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]
pipecat-ai[daily,elevenlabs,openai,silero]

View File

@@ -14,11 +14,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -34,15 +31,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -2,4 +2,4 @@ async_timeout
fastapi
uvicorn
python-dotenv
pipecat-ai[daily,openai,fal]
pipecat-ai[daily,elevenlabs,openai,fal]

View File

@@ -9,11 +9,18 @@ from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame, EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from processors import StoryProcessor, StoryImageProcessor
from prompts import LLM_BASE_PROMPT, LLM_INTRO_PROMPT, CUE_USER_TURN
@@ -22,6 +29,7 @@ from utils.helpers import load_sounds, load_images
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,7 +41,6 @@ images = load_images(["book1.png", "book2.png"])
async def main(room_url, token=None):
async with aiohttp.ClientSession() as session:
# -------------- Transport --------------- #
transport = DailyTransport(
@@ -47,17 +54,14 @@ async def main(room_url, token=None):
camera_out_height=768,
transcription_enabled=True,
vad_enabled=True,
)
),
)
logger.debug("Transport created for room:" + room_url)
# -------------- Services --------------- #
llm_service = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm_service = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts_service = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
@@ -65,10 +69,7 @@ async def main(room_url, token=None):
)
fal_service_params = FalImageGenService.InputParams(
image_size={
"width": 768,
"height": 768
}
image_size={"width": 768, "height": 768}
)
fal_service = FalImageGenService(
@@ -110,12 +111,12 @@ async def main(room_url, token=None):
transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
images['book1'],
images["book1"],
LLMMessagesFrame([LLM_INTRO_PROMPT]),
DailyTransportMessageFrame(CUE_USER_TURN),
sounds["listening"],
images['book2'],
StopTaskFrame()
images["book2"],
StopTaskFrame(),
]
)
@@ -125,16 +126,18 @@ async def main(room_url, token=None):
# The main story pipeline is used to continue the story based on user
# input.
main_pipeline = Pipeline([
transport.input(),
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
transport.output(),
llm_responses
])
main_pipeline = Pipeline(
[
transport.input(),
user_responses,
llm_service,
story_processor,
image_processor,
tts_service,
transport.output(),
llm_responses,
]
)
main_task = PipelineTask(main_pipeline)
@@ -150,6 +153,7 @@ async def main(room_url, token=None):
await runner.run(main_task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Daily Storyteller Bot")
parser.add_argument("-u", type=str, help="Room URL")

View File

@@ -20,10 +20,15 @@ from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Fast API Config ------------ #
@@ -38,12 +43,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -85,55 +91,50 @@ async def start_bot(request: Request) -> JSONResponse:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new VM, or run as a shell process (not recommended)
if os.getenv("RUN_AS_VM", False):
try:
await virtualize_bot(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
else:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
@app.get("/{path_name:path}", response_class=FileResponse)
@@ -155,6 +156,7 @@ async def catch_all(path_name: Optional[str] = ""):
# ------------ Virtualization ------------ #
async def virtualize_bot(room_url: str, token: str):
"""
This is an example of how to virtualize the bot using Fly.io
@@ -163,20 +165,19 @@ async def virtualize_bot(room_url: str, token: str):
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "storytelling-chatbot")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 src/bot.py -u {room_url} -t {token}"
@@ -185,31 +186,28 @@ async def virtualize_bot(room_url: str, token: str):
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 512
}
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 512},
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
vm_id = data["id"]
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
@@ -221,8 +219,13 @@ async def virtualize_bot(room_url: str, token: str):
if __name__ == "__main__":
# Check environment variables
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
required_env_vars = [
"OPENAI_API_KEY",
"DAILY_API_KEY",
"FAL_KEY",
"ELEVENLABS_VOICE_ID",
"ELEVENLABS_API_KEY",
]
for env_var in required_env_vars:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
@@ -232,20 +235,11 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)

View File

@@ -6,7 +6,8 @@ from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
TextFrame,
UserStoppedSpeakingFrame)
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransportMessageFrame
@@ -35,6 +36,7 @@ class StoryPromptFrame(TextFrame):
# ------------ Frame Processors ----------- #
class StoryImageProcessor(FrameProcessor):
"""
Processor for image prompt frames that will be sent to the FAL service.
@@ -113,7 +115,7 @@ class StoryProcessor(FrameProcessor):
# Extract the image prompt from the text using regex
image_prompt = re.search(r"<(.*?)>", self._text).group(1)
# Remove the image prompt from the text
self._text = re.sub(r"<.*?>", '', self._text, count=1)
self._text = re.sub(r"<.*?>", "", self._text, count=1)
# Process the image prompt frame
await self.push_frame(StoryImageFrame(image_prompt))
@@ -124,8 +126,7 @@ class StoryProcessor(FrameProcessor):
if re.search(r".*\[[bB]reak\].*", self._text):
# Remove the [break] token from the text
# so it isn't spoken out loud by the TTS
self._text = re.sub(r'\[[bB]reak\]', '',
self._text, flags=re.IGNORECASE)
self._text = re.sub(r"\[[bB]reak\]", "", self._text, flags=re.IGNORECASE)
self._text = self._text.replace("\n", " ")
if len(self._text) > 2:
# Append the sentence to the story

View File

@@ -3,7 +3,7 @@ LLM_INTRO_PROMPT = {
"content": "You are a creative storyteller who loves to tell whimsical, fantastical stories. \
Your goal is to craft an engaging and fun story. \
Start by asking the user what kind of story they'd like to hear. Don't provide any examples. \
Keep your response to only a few sentences."
Keep your response to only a few sentences.",
}
@@ -25,7 +25,7 @@ LLM_BASE_PROMPT = {
Responses should use the format: <...> story sentence [break] <...> story sentence [break] ... \
After each response, ask me how I'd like the story to continue and wait for my input. \
Please ensure your responses are less than 3-4 sentences long. \
Please refrain from using any explicit language or content. Do not tell scary stories."
Please refrain from using any explicit language or content. Do not tell scary stories.",
}

View File

@@ -2,7 +2,7 @@ import os
import wave
from PIL import Image
from pipecat.frames.frames import AudioRawFrame, ImageRawFrame
from pipecat.frames.frames import OutputAudioRawFrame, OutputImageRawFrame
script_dir = os.path.dirname(__file__)
@@ -16,7 +16,9 @@ def load_images(image_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with Image.open(full_path) as img:
images[filename] = ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)
images[filename] = OutputImageRawFrame(
image=img.tobytes(), size=img.size, format=img.format
)
return images
@@ -30,8 +32,10 @@ def load_sounds(sound_files):
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[filename] = AudioRawFrame(audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels())
sounds[filename] = OutputAudioRawFrame(
audio=audio_file.readframes(-1),
sample_rate=audio_file.getframerate(),
num_channels=audio_file.getnchannels(),
)
return sounds

View File

@@ -17,16 +17,13 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -42,15 +39,19 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -13,7 +13,9 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -24,6 +26,7 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
# Run this script directly from your command line.
@@ -45,15 +48,17 @@ def truncate_content(content, model_name):
return encoding.decode(truncated_tokens)
return content
# Main function to extract content from url
async def get_article_content(url: str, aiohttp_session: aiohttp.ClientSession):
if 'arxiv.org' in url:
if "arxiv.org" in url:
return await get_arxiv_content(url, aiohttp_session)
else:
return await get_wikipedia_content(url, aiohttp_session)
# Helper function to extract content from Wikipedia url (this is
# technically agnostic to URL type but will work best with Wikipedia
# articles)
@@ -65,23 +70,24 @@ async def get_wikipedia_content(url: str, aiohttp_session: aiohttp.ClientSession
return "Failed to download Wikipedia article."
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
soup = BeautifulSoup(text, "html.parser")
content = soup.find('div', {'class': 'mw-parser-output'})
content = soup.find("div", {"class": "mw-parser-output"})
if content:
return content.get_text()
else:
return "Failed to extract Wikipedia article content."
# Helper function to extract content from arXiv url
async def get_arxiv_content(url: str, aiohttp_session: aiohttp.ClientSession):
if '/abs/' in url:
url = url.replace('/abs/', '/pdf/')
if not url.endswith('.pdf'):
url += '.pdf'
if "/abs/" in url:
url = url.replace("/abs/", "/pdf/")
if not url.endswith(".pdf"):
url += ".pdf"
async with aiohttp_session.get(url) as response:
if response.status != 200:
@@ -95,6 +101,7 @@ async def get_arxiv_content(url: str, aiohttp_session: aiohttp.ClientSession):
text += page.extract_text()
return text
# This is the main function that handles STT -> LLM -> TTS
@@ -116,40 +123,46 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
sample_rate=44100,
params=CartesiaTTSService.InputParams(
sample_rate=44100,
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
messages = [{
"role": "system", "content": f"""You are an AI study partner. You have been given the following article content:
messages = [
{
"role": "system",
"content": f"""You are an AI study partner. You have been given the following article content:
{article_content}
Your task is to help the user understand and learn from this article in 2 sentences. THESE RESPONSES SHOULD BE ONLY MAX 2 SENTENCES. THIS INSTRUCTION IS VERY IMPORTANT. RESPONSES SHOULDN'T BE LONG.
""", }, ]
""",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline(
[
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -159,12 +172,15 @@ Your task is to help the user understand and learn from this article in 2 senten
messages.append(
{
"role": "system",
"content": "Hello! I'm ready to discuss the article with you. What would you like to learn about?"})
"content": "Hello! I'm ready to discuss the article with you. What would you like to learn about?",
}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -22,13 +22,15 @@ from pipecat.transports.services.daily import (
DailyParams,
DailyTranscriptionSettings,
DailyTransport,
DailyTransportMessageFrame)
DailyTransportMessageFrame,
)
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,7 +46,6 @@ It also isn't saving what the user or bot says into the context object for use i
# We need to use a custom service here to yield LLM frames without saving
# any context
class TranslationProcessor(FrameProcessor):
def __init__(self, language):
super().__init__()
self._language = language
@@ -80,10 +81,7 @@ class TranslationSubtitles(FrameProcessor):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
message = {
"language": self._language,
"text": frame.text
}
message = {"language": self._language, "text": frame.text}
await self.push_frame(DailyTransportMessageFrame(message))
await self.push_frame(frame)
@@ -100,10 +98,8 @@ async def main():
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
transcription_settings=DailyTranscriptionSettings(extra={
"interim_results": False
})
)
transcription_settings=DailyTranscriptionSettings(extra={"interim_results": False}),
),
)
tts = AzureTTSService(
@@ -112,26 +108,14 @@ async def main():
voice="es-ES-AlvaroNeural",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
sa = SentenceAggregator()
tp = TranslationProcessor("Spanish")
lfra = LLMFullResponseAggregator()
ts = TranslationSubtitles("spanish")
pipeline = Pipeline([
transport.input(),
sa,
tp,
llm,
lfra,
ts,
tts,
transport.output()
])
pipeline = Pipeline([transport.input(), sa, tp, llm, lfra, ts, tts, transport.output()])
task = PipelineTask(pipeline)

View File

@@ -15,11 +15,8 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
@@ -35,15 +32,18 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,13 +38,14 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -65,37 +66,34 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
],
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -107,8 +105,7 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -125,14 +122,10 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()

View File

@@ -55,7 +55,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
2. **Update the Twilio Webhook**:
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
3. **Update the streams.xml**:
3. **Update streams.xml**:
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.
## Running the Application

View File

@@ -1,4 +1,3 @@
import aiohttp
import os
import sys
@@ -8,18 +7,22 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams,
)
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -27,63 +30,61 @@ logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_sid):
async with aiohttp.ClientSession() as session:
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid)
)
)
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid),
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY'))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
tma_out, # LLM responses
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
runner = PipelineRunner(handle_sigint=False)
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
await runner.run(task)

View File

@@ -1,4 +1,4 @@
pipecat-ai[daily,openai,silero,deepgram]
pipecat-ai[daily,cartesia,openai,silero,deepgram]
fastapi
uvicorn
python-dotenv

View File

@@ -19,7 +19,7 @@ app.add_middleware(
)
@app.post('/start_call')
@app.post("/start_call")
async def start_call():
print("POST TwiML")
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")
@@ -32,7 +32,7 @@ async def websocket_endpoint(websocket: WebSocket):
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data['start']['streamSid']
stream_sid = call_data["start"]["streamSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid)

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -15,17 +14,21 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.transports.network.websocket_server import (
WebsocketServerParams,
WebsocketServerTransport,
)
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,60 +36,59 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
tma_out, # LLM responses
]
)
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -24,6 +24,7 @@ message AudioRawFrame {
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
optional uint64 pts = 6;
}
message TranscriptionFrame {

View File

@@ -1,2 +1,2 @@
python-dotenv
pipecat-ai[openai,silero,websocket,whisper]
pipecat-ai[cartesia,openai,silero,websocket,whisper]

View File

@@ -35,28 +35,29 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.34.0" ]
aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "websockets~=12.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.5.0" ]
elevenlabs = [ "websockets~=12.0" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.7.2" ]
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.13.1" ]
livekit = [ "livekit~=0.13.1", "tenacity~=9.0.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
openai = [ "openai~=1.37.2" ]
openpipe = [ "openpipe~=4.24.0" ]
playht = [ "pyht~=0.0.28" ]
silero = [ "silero-vad~=5.1" ]
silero = [ "onnxruntime>=1.16.1" ]
together = [ "together~=1.2.7" ]
websocket = [ "websockets~=12.0", "fastapi~=0.112.1" ]
websocket = [ "websockets~=12.0", "fastapi~=0.115.0" ]
whisper = [ "faster-whisper~=1.0.3" ]
xtts = [ "resampy~=0.4.3" ]

View File

@@ -8,7 +8,6 @@ from abc import ABC, abstractmethod
class BaseClock(ABC):
@abstractmethod
def get_time(self) -> int:
pass

View File

@@ -10,7 +10,6 @@ from pipecat.clocks.base_clock import BaseClock
class SystemClock(BaseClock):
def __init__(self):
self._time = 0

View File

@@ -24,6 +24,7 @@ message AudioRawFrame {
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
optional uint64 pts = 6;
}
message TranscriptionFrame {

View File

@@ -4,11 +4,11 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, List, Mapping, Optional, Tuple
from dataclasses import dataclass, field
from typing import Any, List, Optional, Tuple, Union
from pipecat.clocks.base_clock import BaseClock
from pipecat.metrics.metrics import MetricsData
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
@@ -41,10 +41,8 @@ class DataFrame(Frame):
@dataclass
class AudioRawFrame(DataFrame):
"""A chunk of audio. Will be played by the transport if the transport's
microphone has been enabled.
"""A chunk of audio."""
"""
audio: bytes
sample_rate: int
num_channels: int
@@ -58,12 +56,37 @@ class AudioRawFrame(DataFrame):
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class InputAudioRawFrame(AudioRawFrame):
"""A chunk of audio usually coming from an input transport."""
pass
@dataclass
class OutputAudioRawFrame(AudioRawFrame):
"""A chunk of audio. Will be played by the output transport if the
transport's microphone has been enabled.
"""
pass
@dataclass
class TTSAudioRawFrame(OutputAudioRawFrame):
"""A chunk of output audio generated by a TTS service."""
pass
@dataclass
class ImageRawFrame(DataFrame):
"""An image. Will be shown by the transport if the transport's camera is
enabled.
"""
image: bytes
size: Tuple[int, int]
format: str | None
@@ -74,37 +97,22 @@ class ImageRawFrame(DataFrame):
@dataclass
class URLImageRawFrame(ImageRawFrame):
"""An image with an associated URL. Will be shown by the transport if the
transport's camera is enabled.
"""
url: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
class InputImageRawFrame(ImageRawFrame):
pass
@dataclass
class VisionImageRawFrame(ImageRawFrame):
"""An image with an associated text to ask for a description of it. Will be
shown by the transport if the transport's camera is enabled.
"""
text: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
class OutputImageRawFrame(ImageRawFrame):
pass
@dataclass
class UserImageRawFrame(ImageRawFrame):
class UserImageRawFrame(InputImageRawFrame):
"""An image associated to a user. Will be shown by the transport if the
transport's camera is enabled.
"""
user_id: str
def __str__(self):
@@ -112,6 +120,34 @@ class UserImageRawFrame(ImageRawFrame):
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it. Will be
shown by the transport if the transport's camera is enabled.
"""
text: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: [{self.text}], size: {self.size}, format: {self.format})"
@dataclass
class URLImageRawFrame(OutputImageRawFrame):
"""An image with an associated URL. Will be shown by the transport if the
transport's camera is enabled.
"""
url: str | None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
@dataclass
class SpriteFrame(Frame):
"""An animated sprite. Will be shown by the transport if the transport's
@@ -119,6 +155,7 @@ class SpriteFrame(Frame):
`camera_out_framerate` constructor parameter.
"""
images: List[ImageRawFrame]
def __str__(self):
@@ -132,11 +169,12 @@ class TextFrame(DataFrame):
be used to send text through pipelines.
"""
text: str
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text})"
return f"{self.name}(pts: {pts}, text: [{self.text}])"
@dataclass
@@ -145,24 +183,26 @@ class TranscriptionFrame(TextFrame):
transport's receive queue when a participant speaks.
"""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
class InterimTranscriptionFrame(TextFrame):
"""A text frame with interim transcription-specific data. Will be placed in
the transport's receive queue when a participant speaks."""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@dataclass
@@ -174,6 +214,7 @@ class LLMMessagesFrame(DataFrame):
processors.
"""
messages: List[dict]
@@ -183,6 +224,7 @@ class LLMMessagesAppendFrame(DataFrame):
current context.
"""
messages: List[dict]
@@ -193,6 +235,7 @@ class LLMMessagesUpdateFrame(DataFrame):
LLMMessagesFrame.
"""
messages: List[dict]
@@ -202,13 +245,14 @@ class LLMSetToolsFrame(DataFrame):
The specific format depends on the LLM being used, but it should typically
contain JSON Schema objects.
"""
tools: List[dict]
@dataclass
class LLMEnablePromptCachingFrame(DataFrame):
"""A frame to enable/disable prompt caching in certain LLMs.
"""
"""A frame to enable/disable prompt caching in certain LLMs."""
enable: bool
@@ -218,6 +262,7 @@ class TTSSpeakFrame(DataFrame):
pipeline (if any).
"""
text: str
@@ -229,6 +274,7 @@ class TransportMessageFrame(DataFrame):
def __str__(self):
return f"{self.name}(message: {self.message})"
#
# App frames. Application user-defined frames.
#
@@ -238,6 +284,7 @@ class TransportMessageFrame(DataFrame):
class AppFrame(Frame):
pass
#
# System frames
#
@@ -248,9 +295,21 @@ class SystemFrame(Frame):
pass
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class CancelFrame(SystemFrame):
"""Indicates that a pipeline needs to stop right away."""
pass
@@ -261,6 +320,7 @@ class ErrorFrame(SystemFrame):
bot should exit.
"""
error: str
fatal: bool = False
@@ -274,9 +334,31 @@ class FatalErrorFrame(ErrorFrame):
that the bot should exit.
"""
fatal: bool = field(default=True, init=False)
@dataclass
class EndTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
closed nicely (flushing all the queued frames) by pushing an EndFrame
downstream.
"""
pass
@dataclass
class CancelTaskFrame(SystemFrame):
"""This is used to notify the pipeline task that the pipeline should be
stopped immediately by pushing a CancelFrame downstream.
"""
pass
@dataclass
class StopTaskFrame(SystemFrame):
"""Indicates that a pipeline task should be stopped but that the pipeline
@@ -284,6 +366,7 @@ class StopTaskFrame(SystemFrame):
the pipeline task.
"""
pass
@@ -295,6 +378,7 @@ class StartInterruptionFrame(SystemFrame):
guaranteed).
"""
pass
@@ -306,6 +390,7 @@ class StopInterruptionFrame(SystemFrame):
guaranteed).
"""
pass
@@ -316,17 +401,16 @@ class BotInterruptionFrame(SystemFrame):
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass
@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None
"""Emitted by processor that can compute metrics like latencies."""
data: List[MetricsData]
#
# Control frames
@@ -338,16 +422,6 @@ class ControlFrame(Frame):
pass
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class EndFrame(ControlFrame):
"""Indicates that a pipeline has ended and frame processors and pipelines
@@ -357,6 +431,7 @@ class EndFrame(ControlFrame):
was sent (unline system frames).
"""
pass
@@ -364,12 +439,14 @@ class EndFrame(ControlFrame):
class LLMFullResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of an LLM response. Following by one or
more TextFrame and a final LLMFullResponseEndFrame."""
pass
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Indicates the end of an LLM response."""
pass
@@ -381,28 +458,28 @@ class UserStartedSpeakingFrame(ControlFrame):
with a TranscriptionFrame)
"""
pass
@dataclass
class UserStoppedSpeakingFrame(ControlFrame):
"""Emitted by the VAD to indicate that a user stopped speaking."""
pass
@dataclass
class BotStartedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot started speaking.
"""Emitted upstream by transport outputs to indicate the bot started speaking."""
"""
pass
@dataclass
class BotStoppedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot stopped speaking.
"""Emitted upstream by transport outputs to indicate the bot stopped speaking."""
"""
pass
@@ -414,30 +491,34 @@ class BotSpeakingFrame(ControlFrame):
since the user might be listening.
"""
pass
@dataclass
class TTSStartedFrame(ControlFrame):
"""Used to indicate the beginning of a TTS response. Following
AudioRawFrames are part of the TTS response until an TTSEndFrame. These
frames can be used for aggregating audio frames in a transport to optimize
the size of frames sent to the session, without needing to control this in
the TTS service.
TTSAudioRawFrames are part of the TTS response until an
TTSStoppedFrame. These frames can be used for aggregating audio frames in a
transport to optimize the size of frames sent to the session, without
needing to control this in the TTS service.
"""
pass
@dataclass
class TTSStoppedFrame(ControlFrame):
"""Indicates the end of a TTS response."""
pass
@dataclass
class UserImageRequestFrame(ControlFrame):
"""A frame user to request an image from the given user."""
user_id: str
context: Optional[Any] = None
@@ -446,55 +527,51 @@ class UserImageRequestFrame(ControlFrame):
@dataclass
class LLMModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM model.
"""
model: str
class LLMUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update LLM settings."""
model: Optional[str] = None
temperature: Optional[float] = None
top_k: Optional[int] = None
top_p: Optional[float] = None
frequency_penalty: Optional[float] = None
presence_penalty: Optional[float] = None
max_tokens: Optional[int] = None
seed: Optional[int] = None
extra: dict = field(default_factory=dict)
@dataclass
class TTSModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the TTS model.
"""
model: str
class TTSUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update TTS settings."""
model: Optional[str] = None
voice: Optional[str] = None
language: Optional[Language] = None
speed: Optional[Union[str, float]] = None
emotion: Optional[List[str]] = None
engine: Optional[str] = None
pitch: Optional[str] = None
rate: Optional[str] = None
volume: Optional[str] = None
emphasis: Optional[str] = None
style: Optional[str] = None
style_degree: Optional[str] = None
role: Optional[str] = None
@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice.
"""
voice: str
class STTUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update STT settings."""
@dataclass
class TTSLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS language and
optional voice.
"""
language: Language
@dataclass
class STTModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the STT model and optional
language.
"""
model: str
@dataclass
class STTLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to STT language.
"""
language: Language
model: Optional[str] = None
language: Optional[Language] = None
@dataclass
class FunctionCallInProgressFrame(SystemFrame):
"""A frame signaling that a function call is in progress.
"""
"""A frame signaling that a function call is in progress."""
function_name: str
tool_call_id: str
arguments: str
@@ -502,12 +579,13 @@ class FunctionCallInProgressFrame(SystemFrame):
@dataclass
class FunctionCallResultFrame(DataFrame):
"""A frame containing the result of an LLM function (tool) call.
"""
"""A frame containing the result of an LLM function (tool) call."""
function_name: str
tool_call_id: str
arguments: str
result: Any
run_llm: bool = True
@dataclass
@@ -515,4 +593,5 @@ class VADParamsUpdateFrame(ControlFrame):
"""A control frame containing a request to update VAD params. Intended
to be pushed upstream from RTVI processor.
"""
params: VADParams

View File

@@ -14,7 +14,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"}\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\x12\x10\n\x03pts\x18\x06 \x01(\x04H\x00\x88\x01\x01\x42\x06\n\x04_pts\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -24,9 +24,9 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_globals['_TEXTFRAME']._serialized_start=25
_globals['_TEXTFRAME']._serialized_end=76
_globals['_AUDIORAWFRAME']._serialized_start=78
_globals['_AUDIORAWFRAME']._serialized_end=177
_globals['_TRANSCRIPTIONFRAME']._serialized_start=179
_globals['_TRANSCRIPTIONFRAME']._serialized_end=275
_globals['_FRAME']._serialized_start=278
_globals['_FRAME']._serialized_end=425
_globals['_AUDIORAWFRAME']._serialized_end=203
_globals['_TRANSCRIPTIONFRAME']._serialized_start=205
_globals['_TRANSCRIPTIONFRAME']._serialized_end=301
_globals['_FRAME']._serialized_start=304
_globals['_FRAME']._serialized_end=451
# @@protoc_insertion_point(module_scope)

View File

View File

@@ -0,0 +1,31 @@
from typing import Optional
from pydantic import BaseModel
class MetricsData(BaseModel):
processor: str
model: Optional[str] = None
class TTFBMetricsData(MetricsData):
value: float
class ProcessingMetricsData(MetricsData):
value: float
class LLMTokenUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
cache_read_input_tokens: Optional[int] = None
cache_creation_input_tokens: Optional[int] = None
class LLMUsageMetricsData(MetricsData):
value: LLMTokenUsage
class TTSUsageMetricsData(MetricsData):
value: int

View File

@@ -12,7 +12,6 @@ from pipecat.processors.frame_processor import FrameProcessor
class BasePipeline(FrameProcessor):
def __init__(self):
super().__init__()

Some files were not shown because too many files have changed in this diff Show More