Compare commits

..

6 Commits

Author SHA1 Message Date
Jon Taylor
5bd5d22270 removed space from event handler 2024-06-26 18:30:56 +01:00
Jon Taylor
6ee7932337 added pause to start and new intro prompt 2024-06-26 18:24:14 +01:00
Jon Taylor
c407445dd1 removed header comment from bot runner 2024-06-24 17:35:26 +01:00
Jon Taylor
447f37167e added VAD stop seconds env 2024-06-24 17:34:25 +01:00
Jon Taylor
354c21500e prompt tweaks 2024-06-24 17:28:10 +01:00
Jon Taylor
5728e25b5a added fastbot example 2024-06-24 16:25:36 +01:00
282 changed files with 9782 additions and 24834 deletions

View File

@@ -1,4 +1,4 @@
name: format
name: lint
on:
workflow_dispatch:
@@ -12,12 +12,12 @@ on:
- "docs/**"
concurrency:
group: build-format-${{ github.event.pull_request.number || github.ref }}
group: build-lint-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
ruff-format:
name: "Formatting checker"
autopep8:
name: "Formatting lints"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
@@ -25,7 +25,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: '3.10'
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -34,8 +34,11 @@ jobs:
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt
- name: Ruff formatter
id: ruff
- name: autopep8
id: autopep8
run: |
source .venv/bin/activate
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
autopep8 --max-line-length 100 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1

View File

@@ -1,6 +1,10 @@
name: publish-test
on: workflow_dispatch
on:
workflow_dispatch:
push:
branches:
- main
jobs:
build:
@@ -10,6 +14,7 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.gitref }}
fetch-tags: true
fetch-depth: 100
- name: Set up Python

View File

@@ -20,24 +20,21 @@ jobs:
name: "Unit and Integration Tests"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- uses: actions/checkout@v4
- name: Set up Python
id: setup_python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: '3.10'
- name: Cache virtual environment
uses: actions/cache@v3
with:
# We are hashing dev-requirements.txt and test-requirements.txt which
# contain all dependencies needed to run the tests.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
# We are hashing requirements-dev.txt and requirements-extra.txt which
# contain all dependencies needed to run the tests and examples.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('linux-py3.10-requirements.txt') }}-${{ hashFiles('dev-requirements.txt') }}
path: .venv
- name: Install system packages
id: install_system_packages
run: |
sudo apt-get install -y portaudio19-dev
run: sudo apt-get install -y portaudio19-dev
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -45,8 +42,8 @@ jobs:
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt -r test-requirements.txt
pip install -r linux-py3.10-requirements.txt -r dev-requirements.txt
- name: Test with pytest
run: |
source .venv/bin/activate
pytest --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests

View File

@@ -1,628 +1,10 @@
# Changelog
All notable changes to **Pipecat** will be documented in this file.
All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added `AssemblyAISTTService` and corresponding foundational examples
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
- Added a foundational example for Gladia transcription:
`13c-gladia-transcription.py`
### Fixed
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
from `enable_metrics`.
## [0.0.46] - 2024-10-19
### Added
- Added `audio_passthrough` parameter to `STTService`. If enabled it allows
audio frames to be pushed downstream in case other processors need them.
- Added input parameter options for `PlayHTTTSService` and
`PlayHTHttpTTSService`.
### Changed
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has
been added.
- `PlayHTTTSService` now uses PlayHT websockets instead of HTTP requests.
- The previous `PlayHTTTSService` HTTP implementation is now
`PlayHTHttpTTSService`.
- `PlayHTTTSService` and `PlayHTHttpTTSService` now use a `voice_engine` of
`PlayHT3.0-mini`, which allows for multi-lingual support.
- Renamed `OpenAILLMServiceRealtimeBeta` to `OpenAIRealtimeBetaLLMService` to
match other services.
### Deprecated
- `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator` are
mostly deprecated, use `OpenAILLMContext` instead.
- The `vad` package is now deprecated and `audio.vad` should be used
instead. The `avd` package will get removed in a future release.
### Fixed
- Fixed an issue that would cause an error if no VAD analyzer was passed to
`LiveKitTransport` params.
- Fixed `SileroVAD` processor to support interruptions properly.
### Other
- Added `examples/foundational/07-interruptible-vad.py`. This is the same as
`07-interruptible.py` but using the `SileroVAD` processor instead of passing
the `VADAnalyzer` in the transport.
## [0.0.45] - 2024-10-16
### Changed
- Metrics messages have moved out from the transport's base output into RTVI.
## [0.0.44] - 2024-10-15
### Added
- Added support for OpenAI Realtime API with the new
`OpenAILLMServiceRealtimeBeta` processor.
(see https://platform.openai.com/docs/guides/realtime/overview)
- Added `RTVIBotTranscriptionProcessor` which will send the RTVI
`bot-transcription` protocol message. These are TTS text aggregated (into
sentences) messages.
- Added new input params to the `MarkdownTextFilter` utility. You can set
`filter_code` to filter code from text and `filter_tables` to filter tables
from text.
- Added `CanonicalMetricsService`. This processor uses the new
`AudioBufferProcessor` to capture conversation audio and later send it to
Canonical AI.
(see https://canonical.chat/)
- Added `AudioBufferProcessor`. This processor can be used to buffer mixed user and
bot audio. This can later be saved into an audio file or processed by some
audio analyzer.
- Added `on_first_participant_joined` event to `LiveKitTransport`.
### Changed
- LLM text responses are now logged properly as unicode characters.
- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`,
`BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `BotSpeakingFrame` and
`UserImageRequestFrame` are now based from `SystemFrame`
### Fixed
- Merge `RTVIBotLLMProcessor`/`RTVIBotLLMTextProcessor` and
`RTVIBotTTSProcessor`/`RTVIBotTTSTextProcessor` to avoid out of order issues.
- Fixed an issue in RTVI protocol that could cause a `bot-llm-stopped` or
`bot-tts-stopped` message to be sent before a `bot-llm-text` or `bot-tts-text`
message.
- Fixed `DeepgramSTTService` constructor settings not being merged with default
ones.
- Fixed an issue in Daily transport that would cause tasks to be hanging if
urgent transport messages were being sent from a transport event handler.
- Fixed an issue in `BaseOutputTransport` that would cause `EndFrame` to be
pushed downed too early and call `FrameProcessor.cleanup()` before letting the
transport stop properly.
## [0.0.43] - 2024-10-10
### Added
- Added a new util called `MarkdownTextFilter` which is a subclass of a new
base class called `BaseTextFilter`. This is a configurable utility which
is intended to filter text received by TTS services.
- Added new `RTVIUserLLMTextProcessor`. This processor will send an RTVI
`user-llm-text` message with the user content's that was sent to the LLM.
### Changed
- `TransportMessageFrame` doesn't have an `urgent` field anymore, instead
there's now a `TransportMessageUrgentFrame` which is a `SystemFrame` and
therefore skip all internal queuing.
- For TTS services, convert inputted languages to match each service's language
format
### Fixed
- Fixed an issue where changing a language with the Deepgram STT service
wouldn't apply the change. This was fixed by disconnecting and reconnecting
when the language changes.
## [0.0.42] - 2024-10-02
### Added
- `SentryMetrics` has been added to report frame processor metrics to
Sentry. This is now possible because `FrameProcessorMetrics` can now be passed
to `FrameProcessor`.
- Added Google TTS service and corresponding foundational example
`07n-interruptible-google.py`
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
- Added InputParams to Azure TTS service.
- Added `LivekitTransport` (audio-only for now).
- RTVI 0.2.0 is now supported.
- All `FrameProcessors` can now register event handlers.
```
tts = SomeTTSService(...)
@tts.event_handler("on_connected"):
async def on_connected(processor):
...
```
- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
- Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed)
for OpenAI, Anthropic, and Together AI services along with corresponding
setter functions.
- Added `sample_rate` as a constructor parameter for TTS services.
- Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements traveling across the pipeline
are called frames.
To have a deterministic behavior the frames traveling through the pipeline
should always be ordered, except system frames which are out-of-band
frames. To achieve that, each frame processor should only output frames from a
single task.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
optional `pts` field (prensentation timestamp). There's currently just one
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added `CartesiaHttpTTSService`.
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
- `DailyTransport` now uses the number of audio output channels (1 or 2) to set
mono or stereo audio when needed.
- Interruptions support has been added to `TwilioFrameSerializer` when using
`FastAPIWebsocketTransport`.
- Added new `LmntTTSService` text-to-speech service.
(see https://www.lmnt.com/)
- Added `TTSModelUpdateFrame`, `TTSLanguageUpdateFrame`, `STTModelUpdateFrame`,
and `STTLanguageUpdateFrame` frames to allow you to switch models, language
and voices in TTS and STT services.
- Added new `transcriptions.Language` enum.
### Changed
- Context frames are now pushed downstream from assistant context aggregators.
- Removed Silero VAD torch dependency.
- Updated individual update settings frame classes into a single
`ServiceUpdateSettingsFrame` class.
- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
usually come from an input transport and are meant to be processed inside the
pipeline to generate new frames. However, the input frames will not be sent
through an output transport. The output frames can also be processed by any
frame processor in the pipeline and they are allowed to be sent by the output
transport.
- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
- `StartFrame` is back a system frame to make sure it's processed immediately by
all processors. `EndFrame` stays a control frame since it needs to be ordered
allowing the frames in the pipeline to be processed.
- Updated `MoondreamService` revision to `2024-08-26`.
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
"almost" because currently the audio frames don't have presentation timestamp
but they should be played at roughly the same time.
- `DailyTransport.on_joined` event now returns the full session data instead of
just the participant.
- `CartesiaTTSService` is now a subclass of `TTSService`.
- `DeepgramSTTService` is now a subclass of `STTService`.
- `WhisperSTTService` is now a subclass of `SegmentedSTTService`. A
`SegmentedSTTService` is a `STTService` where the provided audio is given in a
big chunk (i.e. from when the user starts speaking until the user stops
speaking) instead of a continous stream.
### Fixed
- Fixed OpenAI multiple function calls.
- Fixed a Cartesia TTS issue that would cause audio to be truncated in some
cases.
- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering
tasks (after receiving and `EndFrame`) before the internal queue was emptied,
causing the pipeline to finish prematurely.
- `StartFrame` should be the first frame every processor receives to avoid
situations where things are not initialized (because initialization happens on
`StartFrame`) and other frames come in resulting in undesired behavior.
### Performance
- `obj_id()` and `obj_count()` now use `itertools.count` avoiding the need of
`threading.Lock`.
### Other
- Pipecat now uses Ruff as its formatter (https://github.com/astral-sh/ruff).
## [0.0.41] - 2024-08-22
### Added
- Added `LivekitFrameSerializer` audio frame serializer.
### Fixed
- Fix `FastAPIWebsocketOutputTransport` variable name clash with subclass.
- Fix an `AnthropicLLMService` issue with empty arguments in function calling.
### Other
- Fixed `studypal` example errors.
## [0.0.40] - 2024-08-20
### Added
- VAD parameters can now be dynamicallt updated using the
`VADParamsUpdateFrame`.
- `ErrorFrame` has now a `fatal` field to indicate the bot should exit if a
fatal error is pushed upstream (false by default). A new `FatalErrorFrame`
that sets this flag to true has been added.
- `AnthropicLLMService` now supports function calling and initial support for
prompt caching.
(see https://www.anthropic.com/news/prompt-caching)
- `ElevenLabsTTSService` can now specify ElevenLabs input parameters such as
`output_format`.
- `TwilioFrameSerializer` can now specify Twilio's and Pipecat's desired sample
rates to use.
- Added new `on_participant_updated` event to `DailyTransport`.
- Added `DailyRESTHelper.delete_room_by_name()` and
`DailyRESTHelper.delete_room_by_url()`.
- Added LLM and TTS usage metrics. Those are enabled when
`PipelineParams.enable_usage_metrics` is True.
- `AudioRawFrame`s are now pushed downstream from the base output
transport. This allows capturing the exact words the bot says by adding an STT
service at the end of the pipeline.
- Added new `GStreamerPipelineSource`. This processor can generate image or
audio frames from a GStreamer pipeline (e.g. reading an MP4 file, and RTP
stream or anything supported by GStreamer).
- Added `TransportParams.audio_out_is_live`. This flag is False by default and
it is useful to indicate we should not synchronize audio with sporadic images.
- Added new `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame` control
frames. These frames are pushed upstream and they should wrap
`BotSpeakingFrame`.
- Transports now allow you to register event handlers without decorators.
### Changed
- Support RTVI message protocol 0.1. This includes new messages, support for
messages responses, support for actions, configuration, webhooks and a bunch
of new cool stuff.
(see https://docs.rtvi.ai/)
- `SileroVAD` dependency is now imported via pip's `silero-vad` package.
- `ElevenLabsTTSService` now uses `eleven_turbo_v2_5` model by default.
- `BotSpeakingFrame` is now a control frame.
- `StartFrame` is now a control frame similar to `EndFrame`.
- `DeepgramTTSService` now is more customizable. You can adjust the encoding and
sample rate.
### Fixed
- `TTSStartFrame` and `TTSStopFrame` are now sent when TTS really starts and
stops. This allows for knowing when the bot starts and stops speaking even
with asynchronous services (like Cartesia).
- Fixed `AzureSTTService` transcription frame timestamps.
- Fixed an issue with `DailyRESTHelper.create_room()` expirations which would
cause this function to stop working after the initial expiration elapsed.
- Improved `EndFrame` and `CancelFrame` handling. `EndFrame` should end things
gracefully while a `CancelFrame` should cancel all running tasks as soon as
possible.
- Fixed an issue in `AIService` that would cause a yielded `None` value to be
processed.
- RTVI's `bot-ready` message is now sent when the RTVI pipeline is ready and
a first participant joins.
- Fixed a `BaseInputTransport` issue that was causing incoming system frames to
be queued instead of being pushed immediately.
- Fixed a `BaseInputTransport` issue that was causing start/stop interruptions
incoming frames to not cancel tasks and be processed properly.
### Other
- Added `studypal` example (from to the Cartesia folks!).
- Most examples now use Cartesia.
- Added examples `foundational/19a-tools-anthropic.py`,
`foundational/19b-tools-video-anthropic.py` and
`foundational/19a-tools-togetherai.py`.
- Added examples `foundational/18-gstreamer-filesrc.py` and
`foundational/18a-gstreamer-videotestsrc.py` that show how to use
`GStreamerPipelineSource`
- Remove `requests` library usage.
- Cleanup examples and use `DailyRESTHelper`.
## [0.0.39] - 2024-07-23
### Fixed
- Fixed a regression introduced in 0.0.38 that would cause Daily transcription
to stop the Pipeline.
## [0.0.38] - 2024-07-23
### Added
- Added `force_reload`, `skip_validation` and `trust_repo` to `SileroVAD` and
`SileroVADAnalyzer`. This allows caching and various GitHub repo validations.
- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for
initial empty metrics (zero values). True by default.
### Fixed
- Fixed initial metrics format. It was using the wrong keys name/time instead of
processor/value.
- STT services should be using ISO 8601 time format for transcription frames.
- Fixed an issue that would cause Daily transport to show a stop transcription
error when actually none occurred.
## [0.0.37] - 2024-07-22
### Added
- Added `RTVIProcessor` which implements the RTVI-AI standard.
See https://github.com/rtvi-ai
- Added `BotInterruptionFrame` which allows interrupting the bot while talking.
- Added `LLMMessagesAppendFrame` which allows appending messages to the current
LLM context.
- Added `LLMMessagesUpdateFrame` which allows changing the LLM context for the
one provided in this new frame.
- Added `LLMModelUpdateFrame` which allows updating the LLM model.
- Added `TTSSpeakFrame` which causes the bot say some text. This text will not
be part of the LLM context.
- Added `TTSVoiceUpdateFrame` which allows updating the TTS voice.
### Removed
- We remove the `LLMResponseStartFrame` and `LLMResponseEndFrame` frames. These
were added in the past to properly handle interruptions for the
`LLMAssistantContextAggregator`. But the `LLMContextAggregator` is now based
on `LLMResponseAggregator` which handles interruptions properly by just
processing the `StartInterruptionFrame`, so there's no need for these extra
frames any more.
### Fixed
- Fixed an issue with `StatelessTextTransformer` where it was pushing a string
instead of a `TextFrame`.
- `TTSService` end of sentence detection has been improved. It now works with
acronyms, numbers, hours and others.
- Fixed an issue in `TTSService` that would not properly flush the current
aggregated sentence if an `LLMFullResponseEndFrame` was found.
### Performance
- `CartesiaTTSService` now uses websockets which improves speed. It also
leverages the new Cartesia contexts which maintains generated audio prosody
when multiple inputs are sent, therefore improving audio quality a lot.
## [0.0.36] - 2024-07-02
### Added
- Added `GladiaSTTService`.
See https://docs.gladia.io/chapters/speech-to-text-api/pages/live-speech-recognition
- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS
- Added `UserIdleProcessor`. This processor can be used to wait for any
interaction with the user. If the user doesn't say anything within a given
timeout a provided callback is called.
- Added `IdleFrameProcessor`. This processor can be used to wait for frames
within a given timeout. If no frame is received within the timeout a provided
callback is called.
- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
upstream while the bot is talking.
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
`DeepgramSTTService` need to process things asynchronously. For example, audio
is sent to Deepgram but transcriptions are not returned immediately. In these
cases we still require all frames (except system frames) to be pushed
downstream from a single task. That's what `AsyncFrameProcessor` is for. It
creates a task and all frames should be pushed from that task. So, whenever a
new Deepgram transcription is ready that transcription will also be pushed
from this internal task.
- The `MetricsFrame` now includes processing metrics if metrics are enabled. The
processing metrics indicate the time a processor needs to generate all its
output. Note that not all processors generate these kind of metrics.
### Changed
- `WhisperSTTService` model can now also be a string.
- Added missing \* keyword separators in services.
### Fixed
- `WebsocketServerTransport` doesn't try to send frames anymore if serializers
returns `None`.
- Fixed an issue where exceptions that occurred inside frame processors were
being swallowed and not displayed.
- Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send
data to the websocket after being closed.
### Other
- Added Fly.io deployment example in `examples/deployment/flyio-example`.
- Added new `17-detect-user-idle.py` example that shows how to use the new
`UserIdleProcessor`.
## [0.0.35] - 2024-06-28
### Changed
- `FastAPIWebsocketParams` now require a serializer.
- `TwilioFrameSerializer` now requires a `streamSid`.
### Fixed
- Silero VAD number of frames needs to be 512 for 16000 sample rate or 256 for
8000 sample rate.
## [0.0.34] - 2024-06-25
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
interruptions to ignore transcriptions.
- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate
shorter output.
## [0.0.33] - 2024-06-25
### Changed
- Upgraded to Cartesia's new Python library 1.0.0. `CartesiaTTSService` now
expects a voice ID instead of a voice name (you can get the voice ID from
Cartesia's playground). You can also specify the audio `sample_rate` and
`encoding` instead of the previous `output_format`.
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
cause static audio issues and interruptions to not work properly when dealing
with multiple LLMs sentences.
- Fixed an issue that could mix new LLM responses with previous ones when
handling interruptions.
- Fixed a Daily transport blocking situation that occurred while reading audio
frames after a participant left the room. Needs daily-python >= 0.10.1.
## [0.0.32] - 2024-06-22
### Added
@@ -636,7 +18,7 @@ async def on_connected(processor):
- Added new `TwilioFrameSerializer`. This is a new serializer that knows how to
serialize and deserialize audio frames from Twilio.
- Added Daily transport event: `on_dialout_answered`. See
- Added Daily transport event: `on_dialout_answered`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
- Added new `AzureSTTService`. This allows you to use Azure Speech-To-Text.
@@ -876,7 +258,7 @@ async def on_connected(processor):
- Added Daily transport support for dial-in use cases.
- Added Daily transport events: `on_dialout_connected`, `on_dialout_stopped`,
`on_dialout_error` and `on_dialout_warning`. See
`on_dialout_error` and `on_dialout_warning`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
## [0.0.21] - 2024-05-22

113
README.md
View File

@@ -4,7 +4,8 @@
# Pipecat
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Discord](https://img.shields.io/discord/1239284677165056021
)](https://discord.gg/pipecat)
`pipecat` is a framework for building voice (and multimodal) conversational agents. Things like personal coaches, meeting assistants, [story-telling toys for kids](https://storytelling-chatbot.fly.dev/), customer support bots, [intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0), and snarky social companions.
@@ -38,7 +39,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `assemblyai`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`
- **Transports**: `local`, `websocket`, `daily`
## Code examples
@@ -48,56 +49,56 @@ Your project may or may not need these, so they're made available as optional re
## A simple voice agent running locally
Here is a very basic Pipecat bot that greets a user when they join a real-time session. We'll use [Daily](https://daily.co) for real-time media transport, and [Cartesia](https://cartesia.ai/) for text-to-speech.
Here is a very basic Pipecat bot that greets a user when they join a real-time session. We'll use [Daily](https://daily.co) for real-time media transport, and [ElevenLabs](https://elevenlabs.io/) for text-to-speech.
```python
#app.py
import asyncio
import aiohttp
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
async def main():
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
async with aiohttp.ClientSession() as session:
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
"Bot Name",
DailyParams(audio_out_enabled=True))
# Use Cartesia for Text-to-Speech
tts = CartesiaTTSService(
api_key=...,
voice_id=...
)
# Use Eleven Labs for Text-to-Speech
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=...,
voice_id=...,
)
# Simple pipeline that will process text to speech and output the result
pipeline = Pipeline([tts, transport.output()])
# Simple pipeline that will process text to speech and output the result
pipeline = Pipeline([tts, transport.output()])
# Create Pipecat processor that can run one or more pipelines tasks
runner = PipelineRunner()
# Create Pipecat processor that can run one or more pipelines tasks
runner = PipelineRunner()
# Assign the task callable to run the pipeline
task = PipelineTask(pipeline)
# Assign the task callable to run the pipeline
task = PipelineTask(pipeline)
# Register an event handler to play audio when a
# participant joins the transport WebRTC session
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
# Queue a TextFrame that will get spoken by the TTS service (Cartesia)
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to play audio when a
# participant joins the transport WebRTC session
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
# Queue a TextFrame that will get spoken by the TTS service (Eleven Labs)
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Run the pipeline task
await runner.run(task)
# Run the pipeline task
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
@@ -111,6 +112,7 @@ python app.py
Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
WebSockets are fine for server-to-server communication or for initial development. But for production use, youll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
@@ -123,12 +125,15 @@ Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://
Voice Activity Detection &mdash; very important for knowing when a user has finished speaking to your bot. If you are not using press-to-talk, and want Pipecat to detect when the user has finished talking, VAD is an essential component for a natural feeling conversation.
Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
Pipecast makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
```shell
pip install pipecat-ai[silero]
```
The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console.
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -141,20 +146,20 @@ source venv/bin/activate
From the root of this repo, run the following:
```shell
pip install -r dev-requirements.txt
pip install -r dev-requirements.txt -r {env}-requirements.txt
python -m build
```
This builds the package. To use the package locally (e.g. to run sample files), run
This builds the package. To use the package locally (eg to run sample files), run
```shell
pip install --editable ".[option,...]"
pip install --editable .
```
If you want to use this package from another directory, you can run:
```shell
pip install "path_to_this_repo[option,...]"
pip install path_to_this_repo
```
### Running tests
@@ -162,29 +167,27 @@ pip install "path_to_this_repo[option,...]"
From the root directory, run:
```shell
pytest --doctest-modules --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests
```
## Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting.
### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
You can use [use-package](https://github.com/jwiegley/use-package) to install [py-autopep8](https://codeberg.org/ideasman42/emacs-py-autopep8) package and configure `autopep8` arguments:
```elisp
(use-package lazy-ruff
(use-package py-autopep8
:ensure t
:hook ((python-mode . lazy-ruff-mode))
:defer t
:hook ((python-mode . py-autopep8-mode))
:config
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
(setq py-autopep8-options '("-a" "-a", "--max-line-length=100")))
```
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
`autopep8` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
```elisp
(use-package pyvenv-auto
@@ -197,14 +200,18 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
[autopep8](https://marketplace.visualstudio.com/items?itemName=ms-python.autopep8) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `autopep8` arguments:
```json
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true
},
"ruff.format.args": ["--config", "line-length=100"]
"autopep8.args": [
"-a",
"-a",
"--max-line-length=100"
],
```
## Getting help

View File

@@ -1,8 +1,8 @@
autopep8~=2.1.0
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.376
pytest~=8.3.2
ruff~=0.6.7
setuptools~=72.2.0
pyright~=1.1.367
pytest~=8.2.0
setuptools~=69.5.1
setuptools_scm~=8.1.0

View File

@@ -1,11 +1,6 @@
# Anthropic
ANTHROPIC_API_KEY=...
# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...
# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...
@@ -32,13 +27,6 @@ FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Gladia
GLADIA_API_KEY=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...

View File

@@ -41,7 +41,6 @@ Next, follow the steps in the README for each demo.
| [Patient intake](patient-intake) | A chatbot that can call functions in response to user input. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [studypal](studypal) | A chatbot to have a conversation about any article on the web | |
> [!IMPORTANT]
> These example projects use Daily as a WebRTC transport and can be joined using their hosted Prebuilt UI.

View File

@@ -1,161 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
recordings/
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml

View File

@@ -1,10 +0,0 @@
FROM python:3.10-bullseye
RUN mkdir /app
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -1,37 +0,0 @@
# Simple Chatbot
<img src="image.png" width="420px">
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

View File

@@ -1,146 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import uuid
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.canonical import CanonicalMetricsService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
audio_in_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your responses to 12 words or fewer.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
"""
CanonicalMetrics uses AudioBufferProcessor under the hood to buffer the audio. On
call completion, CanonicalMetrics will send the audio buffer to Canonical for
analysis. Visit https://voice.canonical.chat to learn more.
"""
audio_buffer_processor = AudioBufferProcessor()
canonical = CanonicalMetricsService(
audio_buffer_processor=audio_buffer_processor,
aiohttp_session=session,
api_key=os.getenv("CANONICAL_API_KEY"),
api_url=os.getenv("CANONICAL_API_URL"),
call_id=str(uuid.uuid4()),
assistant="pipecat-chatbot",
assistant_speaks_first=True,
)
pipeline = Pipeline(
[
transport.input(), # microphone
context_aggregator.user(),
llm,
tts,
transport.output(),
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,6 +0,0 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...
CANONICAL_API_KEY=can...
CANONICAL_API_URL=

View File

@@ -1,5 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero,elevenlabs,canonical]

View File

@@ -1,56 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -1,139 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -1,15 +0,0 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -1,37 +0,0 @@
# Simple Chatbot
<img src="image.png" width="420px">
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

View File

@@ -1,141 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
import datetime
import wave
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def save_audio(audiobuffer):
if audiobuffer.has_audio():
merged_audio = audiobuffer.merge_audio_buffers()
filename = f"conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
with wave.open(filename, "wb") as wf:
wf.setnchannels(2)
wf.setsampwidth(2)
wf.setframerate(audiobuffer._sample_rate)
wf.writeframes(merged_audio)
print(f"Merged audio saved to {filename}")
else:
print("No audio data to save")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
audio_in_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your response to 12 words or fewer.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # microphone
context_aggregator.user(),
llm,
tts,
transport.output(),
audiobuffer, # used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
await save_audio(audiobuffer)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,4 +0,0 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...

View File

@@ -1,4 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero,elevenlabs]

View File

@@ -1,56 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -1,139 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -1,13 +0,0 @@
FROM python:3.11-bullseye
# Open port 7860 for http service
ENV FAST_API_PORT=7860
EXPOSE 7860
# Install Python dependencies
COPY *.py .
COPY ./requirements.txt requirements.txt
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
# Start the FastAPI server
CMD python3 bot_runner.py --port ${FAST_API_PORT}

View File

@@ -1,39 +0,0 @@
# Fly.io deployment example
This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load.
For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly.
## Setting up your fly.io deployment
### Create your fly.toml file
You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique.
### Create your .env file
Copy the base `env.example` to `.env` and enter the necessary API keys.
`FLY_APP_NAME` should match that in the `fly.toml` file.
### Launch a new fly.io project
`fly launch` or `fly launch --org your-org-name`
### Set the necessary app secrets from your .env
Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command:
`cat .env | tr '\n' ' ' | xargs flyctl secrets set`
### Deploy your machine
`fly deploy`
## Connecting to your bot
Send a post request to your running fly.io instance:
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/'`
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

View File

@@ -1,101 +0,0 @@
import asyncio
import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str):
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()
asyncio.run(main(config.u, config.t))

View File

@@ -1,211 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import subprocess
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------- Main ----------------- #
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
},
}
# Spawn a new machine instance
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data["id"]
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
print(f"Machine joined room: {room_url}")
@app.post("/")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
if run_as_process:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload code on change"
)
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -1,8 +0,0 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing)
OPENAI_API_KEY=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FLY_API_KEY=
FLY_APP_NAME=
RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process

View File

@@ -1,25 +0,0 @@
# fly.toml app configuration file generated for pipecat-fly-example on 2024-07-01T15:04:53+01:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = 'pipecat-fly-example'
primary_region = 'sjc'
[build]
[env]
FLY_APP_NAME = 'pipecat-fly-example'
[http_service]
internal_port = 7860
force_https = true
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 0
processes = ['app']
[[vm]]
memory = 512
cpu_kind = 'shared'
cpus = 1

View File

@@ -1,5 +0,0 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
python-dotenv
loguru

View File

@@ -1,22 +1,24 @@
import asyncio
import aiohttp
import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyDialinSettings
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -27,70 +29,75 @@ daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str, callId: str, callDomain: str):
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
async with aiohttp.ClientSession() as session:
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=diallin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=diallin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
},
]
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
pipeline = Pipeline([
transport.input(),
context_aggregator.user(),
tma_in,
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -6,62 +6,40 @@ provisioning a room and starting a Pipecat bot in response.
Refer to README for more information.
"""
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomSipParams, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
from twilio.twiml.voice_response import VoiceResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams,
)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ["OPENAI_API_KEY", "DAILY_API_KEY", "ELEVENLABS_API_KEY", "ELEVENLABS_VOICE_ID"]
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_headers=["*"]
)
"""
@@ -75,49 +53,61 @@ action using the Twilio Client library.
"""
async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties(
# Note: these are the default values, except for the display name
sip=DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
display_name="dialin-user",
video=False,
sip_mode="dial-in",
num_endpoints=1
)
)
)
print(f"Creating new room...")
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
else:
# Check passed room URL exist (we assume that it already has a sip set up!)
try:
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
room: DailyRoomObject = daily_rest_helper.get_room_from_url(
room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get room or token token")
raise HTTPException(
status_code=500, detail=f"Failed to get room or token token")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}"
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {
callId} -d {callDomain}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {
token} -i {callId} -s {room.config.sip_endpoint}"
try:
subprocess.Popen(
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
[bot_proc],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return room
@@ -140,16 +130,18 @@ async def twilio_start_bot(request: Request):
pass
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
callId = data.get("CallSid")
callId = data.get('CallSid')
if not callId:
raise HTTPException(status_code=500, detail="Missing 'CallSid' in request")
raise HTTPException(
status_code=500, detail="Missing 'CallSid' in request")
print("CallId: %s" % callId)
# create room and tell the bot to join the created room
# note: Twilio does not require a callDomain
room: DailyRoomObject = await _create_daily_room(room_url, callId, None, "twilio")
room: DailyRoomObject = _create_daily_room(
room_url, callId, None, "twilio")
print(f"Put Twilio on hold...")
# We have the room and the SIP URI,
@@ -159,8 +151,7 @@ async def twilio_start_bot(request: Request):
# http://com.twilio.music.classical.s3.amazonaws.com/BusyStrings.mp3
resp = VoiceResponse()
resp.play(
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10
)
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10)
return str(resp)
@@ -182,14 +173,19 @@ async def daily_start_bot(request: Request) -> JSONResponse:
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
except Exception:
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
raise HTTPException(
status_code=500,
detail="Missing properties 'callId' or 'callDomain'")
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
room: DailyRoomObject = _create_daily_room(
room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
return JSONResponse({
"room_url": room.url,
"sipUri": room.config.sip_endpoint
})
# ----------------- Main ----------------- #
@@ -201,18 +197,24 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -1,112 +1,117 @@
import asyncio
import aiohttp
import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from twilio.rest import Client
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
twilio_account_sid = os.getenv("TWILIO_ACCOUNT_SID")
twilio_auth_token = os.getenv("TWILIO_AUTH_TOKEN")
twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID')
twilio_auth_token = os.getenv('TWILIO_AUTH_TOKEN')
twilioclient = Client(twilio_account_sid, twilio_auth_token)
daily_api_key = os.getenv("DAILY_API_KEY", "")
async def main(room_url: str, token: str, callId: str, sipUri: str):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_key=daily_api_key,
dialin_settings=None, # Not required for Twilio
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
async with aiohttp.ClientSession() as session:
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_key=daily_api_key,
dialin_settings=None, # Not required for Twilio
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
},
]
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
pipeline = Pipeline([
transport.input(),
context_aggregator.user(),
tma_in,
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):
# For Twilio, Telnyx, etc. You need to update the state of the call
# and forward it to the sip_uri..
print(f"Forwarding call: {callId} {sipUri}")
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):
# For Twilio, Telnyx, etc. You need to update the state of the call
# and forward it to the sip_uri..
print(f"Forwarding call: {callId} {sipUri}")
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f"<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>"
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")
runner = PipelineRunner()
await runner.run(task)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,6 +1,7 @@
pipecat-ai[daily,elevenlabs,openai,silero]
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
requests
python-dotenv
twilio
python-multipart
loguru
twilio

View File

@@ -159,3 +159,7 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml
# custom script to recursively upgrade items in requirements.py
upgrade_requirements.py
.DS_Store

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
ClearableDeepgramTTSService,
AudioVolumeTimer,
TranscriptionTimingLogger
)
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
tts = ClearableDeepgramTTSService(
name="Voice",
aiohttp_session=session,
api_key=settings.deepgram_api_key,
voice=settings.deepgram_voice,
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
)
llm = OpenAILLMService(
name="LLM",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
avt, # Audio volume timer
stt, # Speech-to-text
tl, # Transcription timing logger
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Briefly introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
asyncio.run(main(settings))
except ValidationError as e:
print(e)

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import argparse
import subprocess
from pydantic import BaseModel, ValidationError
from typing import Optional
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from bot import BotSettings
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
class RunnerSettings(BaseModel):
prompt: Optional[
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. When introducing yourself briefly mention your goal is to showcase speed and conversational flow. The technology powering you is Daily for transport, Cerebrium for GPU hosting, Llama 3 (8-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are hosted on the east coast of the United States. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
test: Optional[bool] = None
# ----------------- API ----------------- #
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
runner_settings = RunnerSettings()
try:
request_body = await request.body()
if len(request_body) > 0:
runner_settings = RunnerSettings.model_validate_json(request_body)
except ValidationError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid request: {e}")
except Exception as e:
# If no data in request, pass
pass
# Is this a webhook creation request?
if runner_settings.test is not None:
return JSONResponse({"test": True})
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
try:
bot_settings = BotSettings(
room_url=room.url,
room_token=token,
prompt=runner_settings.prompt,
deepgram_voice=runner_settings.deepgram_voice,
openai_model=runner_settings.openai_model,
openai_api_key=runner_settings.openai_api_key,
)
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
subprocess.Popen(
[f"python3 -m bot -s '{bot_settings_str}'"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,12 @@
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
DAILY_API_KEY=
DAILY_API_URL=
DEEPGRAM_API_KEY=
DEEPGRAM_VOICE=
DEEPGRAM_STT_URL=
DEEPGRAM_TTS_BASE_URL=
OPENAI_API_KEY=
OPENAI_MODEL=
OPENAI_BASE_URL=

View File

@@ -0,0 +1,267 @@
from loguru import logger
import asyncio
import math
import struct
import time
from dataclasses import dataclass, field
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
TextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
TTSStoppedFrame,
MetricsFrame
)
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
class GreedyLLMAggregator(FrameProcessor):
def __init__(self, context: OpenAILLMContext = None, **kwargs):
super().__init__(**kwargs)
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(f"{frame}")
try:
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TranscriptionFrame):
# append transcribed text to last "user" frame
if self.context.messages and self.context.messages[-1]["role"] == "user":
last_frame = self.context.messages.pop()
else:
last_frame = {"role": "user", "content": ""}
last_frame["content"] += " " + frame.text
self.context.messages.append(last_frame)
oai_context_frame = OpenAILLMContextFrame(context=self.context)
logger.debug(f"pushing frame {oai_context_frame}")
await self.push_frame(oai_context_frame)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
class ClearableDeepgramTTSService(DeepgramTTSService):
def __init___(self, **kwargs):
super().__init(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
@dataclass
class BufferedSentence:
audio_frames: List[AudioRawFrame] = field(default_factory=list)
text_frame: TextFrame = None
class VADGate(FrameProcessor):
def __init__(
self,
vad_analyzer: VADAnalyzer = None,
context: OpenAILLMContext = None,
**kwargs):
super().__init__(**kwargs)
self.vad_analyzer = vad_analyzer
self.context = context
self._audio_pusher_task = None
self._expect_text_frame_next = False
self._sentences: List[BufferedSentence] = []
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
# each text frame.
#
# start a coroutine to service the queue and send sentences down the pipeline when possible.
# 1. do not send anything when we are not in VADState.QUIET
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
# to output, sleep until it's time to send another sentence
# 3. each time we send a sentence, append it to the conversation context
# 3. when the sentence buffer becomes empty, cancel the coroutine
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
# then a TextFrame.
if self._expect_text_frame_next:
self._expect_text_frame_next = False
if isinstance(frame, TextFrame):
self._sentences[-1].text_frame = frame
else:
logger.debug(f"expected a text frame, but received {frame}")
await self.push_frame(frame, direction)
return
else:
if isinstance(frame, TextFrame):
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
if isinstance(frame, AudioRawFrame):
# if our buffer is empty or has a "finished" sentence at the end,
# then we need to start buffering a new sentence
if not self._sentences or self._sentences[-1].text_frame:
self._sentences.append(BufferedSentence())
self._sentences[-1].audio_frames.append(frame)
await self.maybe_start_audio_pusher_task()
return
if isinstance(frame, TTSStoppedFrame):
self._expect_text_frame_next = True
await self.push_frame(frame, direction)
return
# There are two ways we can be interrupted. During greedy inference, a new
# LLM response can start. Or, during playout, we can get a traditional
# user interruption frame.
if (isinstance(frame, LLMFullResponseStartFrame) or
isinstance(frame, StartInterruptionFrame)):
logger.debug(f"{frame} - Handle interruption in VADGate")
self._sentences = []
if self._audio_pusher_task:
self._audio_pusher_task.cancel()
self._audio_pusher_task = None
await self.push_frame(frame, direction)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
async def maybe_start_audio_pusher_task(self):
try:
if self._audio_pusher_task:
return
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
except Exception as e:
logger.debug(f"Exception {e}")
async def push_audio(self):
try:
while True:
if not self._sentences:
await asyncio.sleep(0.01)
continue
if self.vad_analyzer._vad_state != VADState.QUIET:
await asyncio.sleep(0.01)
continue
# we only want to push completed sentence buffers
if not self._sentences[0].text_frame:
await asyncio.sleep(0.01)
continue
s = self._sentences.pop(0)
if not s.audio_frames:
continue
sample_rate = s.audio_frames[0].sample_rate
duration = 0
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
for frame in s.audio_frames:
await self.push_frame(frame)
# assume linear16 encoding (2 bytes per sample). todo: add some more
# metadata to AudioRawFrame, maybe
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
await asyncio.sleep(duration - 20 / 1000)
if self.context:
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
self.context.messages.append(
{"role": "assistant", "content": s.text_frame.text}
)
await self.push_frame(s.text_frame)
except Exception as e:
logger.debug(f"Exception {e}")
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__()
self.name = "Transcription"
self._avt = avt
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTF: {elapsed}")
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db

View File

@@ -0,0 +1,6 @@
pipecat-ai[daily,openai,silero,deepgram]
fastapi
uvicorn
requests
python-dotenv
loguru

View File

@@ -13,7 +13,7 @@ from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -21,24 +21,21 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
runner = PipelineRunner()
@@ -47,18 +44,13 @@ async def main():
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -9,18 +9,17 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -31,9 +30,10 @@ async def main():
async with aiohttp.ClientSession() as session:
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
pipeline = Pipeline([tts, transport.output()])
@@ -42,7 +42,7 @@ async def main():
async def say_something():
await asyncio.sleep(1)
await task.queue_frame(TextFrame("Hello there!"))
await task.queue_frames([TextFrame("Hello there!"), EndFrame()])
runner = PipelineRunner()

View File

@@ -1,111 +0,0 @@
import argparse
import asyncio
import os
import sys
import aiohttp
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport
from livekit import api
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
async def configure_livekit():
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token(room_name, "Say One Thing", api_key, api_secret)
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name)
async def main():
async with aiohttp.ClientSession() as session:
(url, token, room_name) = await configure_livekit()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TextFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -13,7 +13,7 @@ from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -22,34 +22,35 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
room_url,
None,
"Say One Thing From an LLM",
DailyParams(audio_out_enabled=True))
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}
]
}]
runner = PipelineRunner()
@@ -57,14 +58,11 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -9,7 +9,7 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -21,26 +21,29 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Show a still frame image",
DailyParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
DailyParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -51,14 +54,15 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Note that we do not put an EndFrame() item in the pipeline for this demo.
# This means that the bot will stay in the channel until it times out.
# An EndFrame() in the pipeline would cause the transport to shut
# down.
await task.queue_frames([TextFrame("a cat in the style of picasso")])
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -22,7 +22,6 @@ from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -36,11 +35,15 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
TransportParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -53,7 +56,7 @@ async def main():
runner = PipelineRunner()
async def run_tk():
while not task.has_finished():
while runner.is_active():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)

View File

@@ -4,10 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
#
# This example broken on latest pipecat and needs updating.
#
import aiohttp
import asyncio
import os
@@ -28,17 +24,14 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Static And Dynamic Speech")
meeting = TransportServiceOutput(transport, mic_enabled=True)
@@ -59,7 +52,8 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
messages = [{"role": "system",
"content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM
# output to audio frames. This task will run in parallel with generating
@@ -77,7 +71,8 @@ async def main():
]
)
merge_pipeline = SequentialMergePipeline([simple_tts_pipeline, llm_pipeline])
merge_pipeline = SequentialMergePipeline(
[simple_tts_pipeline, llm_pipeline])
await asyncio.gather(
transport.run(merge_pipeline),
@@ -87,4 +82,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -13,19 +13,23 @@ from dataclasses import dataclass
from pipecat.frames.frames import (
AppFrame,
EndFrame,
Frame,
ImageRawFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
TextFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -34,7 +38,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -70,10 +73,8 @@ class MonthPrepender(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -82,46 +83,48 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
),
camera_out_height=1024
)
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)
sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()
# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen], # Generate image
),
transport.output(), # Transport output
]
)
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
),
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])
frames = []
for month in [
@@ -147,6 +150,8 @@ async def main():
frames.append(MonthFrame(month=month))
frames.append(LLMMessagesFrame(messages))
frames.append(EndFrame())
runner = PipelineRunner()
task = PipelineTask(pipeline)
@@ -157,4 +162,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -11,25 +11,18 @@ import sys
import tkinter as tk
from pipecat.frames.frames import (
Frame,
OutputAudioRawFrame,
TTSAudioRawFrame,
URLImageRawFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
@@ -49,12 +42,7 @@ async def main():
runner = PipelineRunner()
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
messages = [{"role": "system", "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", }]
class ImageDescription(FrameProcessor):
def __init__(self):
@@ -72,17 +60,14 @@ async def main():
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
if isinstance(frame, AudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
await self.push_frame(frame, direction)
self.frame = AudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels)
class ImageGrabber(FrameProcessor):
def __init__(self):
@@ -94,22 +79,24 @@ async def main():
if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
key=os.getenv("FAL_KEY"))
sentence_aggregator = SentenceAggregator()
aggregator = LLMFullResponseAggregator()
description = ImageDescription()
@@ -117,27 +104,13 @@ async def main():
image_grabber = ImageGrabber()
# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
])
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
@@ -158,19 +131,20 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
),
)
camera_out_height=1024))
pipeline = Pipeline([transport.output()])
task = PipelineTask(pipeline)
# We only specify a few months as we create tasks all at once and we
# might get rate limited otherwise.
# We only specify 5 months as we create tasks all at once and we might
# get rate limited otherwise.
months: list[str] = [
"January",
"February",
# "March",
# "April",
# "May",
]
# We create one task per month. This will be executed concurrently.

View File

@@ -9,59 +9,33 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
TTSUsageMetricsData,
)
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -70,18 +44,23 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
ml = MetricsLogger()
fl = FrameLogger("!!! after LLM", "red")
fltts = FrameLogger("@@@ out of tts", "green")
flend = FrameLogger("### out of the end", "magenta")
messages = [
{
@@ -89,21 +68,20 @@ async def main():
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
ml,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
fl,
tts,
fltts,
transport.output(),
tma_out,
flend
])
task = PipelineTask(pipeline)
@@ -111,7 +89,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -120,4 +99,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -11,16 +11,19 @@ import sys
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame
from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams
from runner import configure
@@ -28,7 +31,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -49,51 +51,39 @@ class ImageSyncAggregator(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format,
)
)
if not isinstance(frame, SystemFrame):
await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format))
await self.push_frame(frame)
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format,
)
)
await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format))
else:
await self.push_frame(frame)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -102,33 +92,31 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline(
[
transport.input(),
image_sync_aggregator,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out
])
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
participant_name = participant["info"]["userName"] or ''
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
await task.queue_frames([TextFrame(f"Hi, this is {participant_name}.")])
runner = PipelineRunner()
@@ -136,4 +124,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.audio.vad.silero import SileroVAD
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
),
)
vad = SileroVAD()
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
vad,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,32 +9,30 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -43,16 +41,19 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -61,35 +62,30 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -98,4 +94,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -5,34 +5,34 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -41,18 +41,19 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229"
)
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-opus-20240229")
# todo: think more about how to handle system prompts in a more general way. OpenAI,
# Google, and Anthropic all have slightly different approaches to providing a system
@@ -64,19 +65,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -92,4 +91,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -10,18 +10,16 @@ import sys
import aiohttp
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
@@ -34,7 +32,6 @@ from loguru import logger
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -50,10 +47,8 @@ def get_session_history(session_id: str) -> BaseChatMessageHistory:
return message_store[session_id]
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -66,29 +61,27 @@ async def main():
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
("system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input",
)
input_messages_key="input")
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
@@ -96,12 +89,12 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
@@ -115,7 +108,11 @@ async def main():
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [({"content": "Please briefly introduce yourself to the user."})]
messages = [(
{
"content": "Please briefly introduce yourself to the user."
}
)]
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -124,4 +121,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -5,34 +5,34 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -41,15 +41,21 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -58,27 +64,27 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -87,4 +93,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -0,0 +1,94 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,99 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -8,96 +8,86 @@ import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.playht import PlayHTTTSService
from pipecat.transcriptions.language import Language
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
params=PlayHTTTSService.InputParams(language=Language.EN),
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,19 +4,19 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.azure import AzureLLMService, AzureSTTService, AzureTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -24,81 +24,77 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -8,81 +8,85 @@ import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.openai import OpenAITTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="alloy")
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,15 +9,18 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openpipe import OpenPipeLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -25,17 +28,14 @@ from loguru import logger
import time
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -44,13 +44,14 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
timestamp = int(time.time())
@@ -58,7 +59,9 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
model="gpt-4o",
tags={"conversation_id": f"pipecat-{timestamp}"},
tags={
"conversation_id": f"pipecat-{timestamp}"
}
)
messages = [
@@ -67,20 +70,17 @@ async def main():
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@@ -88,7 +88,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -97,4 +98,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,95 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,99 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,91 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.lmnt import LmntTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = LmntTTSService(api_key=os.getenv("LMNT_API_KEY"), voice_id="morgan")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User respones
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,109 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.ai_services import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
params=TogetherLLMService.InputParams(
temperature=1.0,
top_p=0.9,
top_k=40,
extra={
"frequency_penalty": 2.0,
"presence_penalty": 0.0,
},
),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond in plain language. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,99 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws import AWSTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AWSTTSService(
api_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="Amy",
params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,96 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
voice_id="en-US-Neural2-J",
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User respones
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,97 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -3,19 +3,18 @@ import aiohttp
import asyncio
import logging
import os
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.pipeline.aggregators import SentenceAggregator
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.services.daily import DailyTransport
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.transports.daily_transport import DailyTransport
from pipecat.services.azure_ai_services import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.services.fal_ai_services import FalImageGenService
from pipecat.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
@@ -23,10 +22,8 @@ logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
async def main():
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -54,7 +51,9 @@ async def main():
voice_id="jBpfuIE2acCO8z3wKNLl",
)
dalle = FalImageGenService(
params=FalImageGenService.InputParams(image_size="1024x1024"),
params=FalImageGenService.InputParams(
image_size="1024x1024"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -74,11 +73,13 @@ async def main():
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
"""This function streams text from the LLM and uses the TTS service to convert
that text to speech as it's received."""
that text to speech as it's received. """
source_queue = asyncio.Queue()
sink_queue = asyncio.Queue()
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
pipeline = Pipeline(
[llm, sentence_aggregator, tts1], source_queue, sink_queue
)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(EndFrame())
@@ -143,4 +144,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -4,21 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransport, DailyParams
from runner import configure
@@ -26,63 +17,38 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720,
),
async def main(room_url, token):
transport = DailyTransport(
room_url, token, "Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])
pipeline = Pipeline([transport.input(), transport.output()])
runner = PipelineRunner()
runner = PipelineRunner()
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,23 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
import tkinter as tk
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -30,73 +21,46 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
async def main(room_url, token):
tk_root = tk.Tk()
tk_root.title("Local Mirror")
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
daily_transport = DailyTransport(room_url, token, "Test", DailyParams(audio_in_enabled=True))
tk_transport = TkLocalTransport(
tk_root,
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
tk_root = tk.Tk()
tk_root.title("Local Mirror")
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
task = PipelineTask(pipeline)
tk_transport = TkLocalTransport(
tk_root,
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720,
),
)
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
runner = PipelineRunner()
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(pipeline)
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
runner = PipelineRunner()
await asyncio.gather(runner.run(task), run_tk())
await asyncio.gather(runner.run(task), run_tk())
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,32 +9,31 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
@@ -43,16 +42,19 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -62,21 +64,18 @@ async def main():
]
hey_robot_filter = WakeCheckFilter(["hey robot", "hey, robot"])
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -91,4 +90,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -10,29 +10,31 @@ import os
import sys
import wave
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
OutputAudioRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMUserResponseAggregator,
LLMAssistantResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -51,12 +53,12 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
class OutboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -69,6 +71,7 @@ class OutboundSoundEffectWrapper(FrameProcessor):
class InboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -80,10 +83,8 @@ class InboundSoundEffectWrapper(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -92,15 +93,18 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV",
)
messages = [
@@ -110,27 +114,25 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
out_sound = OutboundSoundEffectWrapper()
in_sound = InboundSoundEffectWrapper()
fl = FrameLogger("LLM Out")
fl2 = FrameLogger("Transcription In")
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out
])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
@@ -146,4 +148,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -17,16 +16,16 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.moondream import MoondreamService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,16 +45,12 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -63,8 +59,14 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
user_response = UserResponseAggregator()
@@ -76,9 +78,10 @@ async def main():
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
@transport.event_handler("on_first_participant_joined")
@@ -88,17 +91,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -106,6 +107,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -17,16 +16,16 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,16 +45,12 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -64,8 +60,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -75,12 +71,13 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
model="gemini-1.5-flash-latest",
api_key=os.getenv("GOOGLE_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
@transport.event_handler("on_first_participant_joined")
@@ -90,17 +87,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -108,6 +103,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -17,16 +16,16 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,16 +45,12 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -63,8 +59,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -73,11 +69,15 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
openai = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
@transport.event_handler("on_first_participant_joined")
@@ -87,17 +87,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -105,6 +103,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -17,16 +16,16 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.user_response import UserResponseAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,16 +45,12 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -63,8 +59,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -73,14 +69,15 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
anthropic = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-sonnet-20240229"
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
@transport.event_handler("on_first_participant_joined")
@@ -90,17 +87,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -108,6 +103,6 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
@@ -21,7 +20,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,6 +27,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -36,26 +35,23 @@ class TranscriptionLogger(FrameProcessor):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
async def main(room_url: str):
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = WhisperSTTService()
stt = WhisperSTTService()
tl = TranscriptionLogger()
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -19,7 +19,6 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -27,6 +26,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -14,7 +13,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.deepgram import DeepgramSTTService, LiveOptions, Language
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -22,7 +21,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -30,6 +28,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -37,29 +36,23 @@ class TranscriptionLogger(FrameProcessor):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
async def main(room_url: str):
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = DeepgramSTTService(os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
# live_options=LiveOptions(language=Language.FR),
)
tl = TranscriptionLogger()
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -1,63 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gladia import GladiaSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
# live_options=LiveOptions(language=Language.FR),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,62 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,13 +9,19 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -24,30 +30,22 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def start_fetch_weather(llm):
await llm.push_frame(TextFrame("Let me think."))
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_weather_from_api(llm, args):
return {"conditions": "nice", "temperature": "75"}
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -56,19 +54,26 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
start_callback=start_fetch_weather)
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
tools = [
ChatCompletionToolParam(
@@ -85,15 +90,17 @@ async def main():
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"enum": [
"celsius",
"fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
"required": [
"location",
"format"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -102,34 +109,26 @@ async def main():
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(context)
pipeline = Pipeline([
fl_in,
transport.input(),
tma_in,
llm,
fl_out,
tts,
transport.output(),
tma_out
])
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
@@ -137,4 +136,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,118 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
llm.register_function("get_weather", get_weather)
tools = [
{
"name": "get_weather",
"description": "Get the current weather in a given location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
}
},
"required": ["location"],
},
}
]
# todo: test with very short initial user message
# messages = [{"role": "system",
# "content": "You are a helpful assistant who can report the weather in any location in the universe. Respond concisely. Your response will be turned into speech so use only simple words and punctuation."},
# {"role": "user",
# "content": " Start the conversation by introducing yourself."}]
messages = [{"role": "user", "content": "Say 'hello' to start the conversation."}]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,173 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def main():
global llm
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True,
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
tools = [
{
"name": "get_weather",
"description": "Get the current weather in a given location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
}
},
"required": ["location"],
},
},
{
"name": "get_image",
"description": "Get an image from the video stream.",
"input_schema": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
},
"required": ["question"],
},
},
]
# todo: test with very short initial user message
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
If you need to use a tool, simply use the tool. Do not tell the user the tool you are using. Be brief and concise.
"""
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt,
}
],
},
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(video_participant_id)
transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,136 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
# await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,167 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
),
ChatCompletionToolParam(
type="function",
function={
"name": "get_image",
"description": "Get an image from the video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to ask the AI to generate an image of",
},
},
"required": ["question"],
},
},
),
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
messages = [
{"role": "system", "content": system_prompt},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,22 +4,26 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -28,7 +32,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -37,14 +40,10 @@ logger.add(sys.stderr, level="DEBUG")
current_voice = "News Lady"
async def switch_voice(function_name, tool_call_id, args, llm, context, result_callback):
async def switch_voice(llm, args):
global current_voice
current_voice = args["voice"]
await result_callback(
{
"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."
}
)
return {"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."}
async def news_lady_filter(frame) -> bool:
@@ -59,38 +58,42 @@ async def barbershop_man_filter(frame) -> bool:
return current_voice == "Barbershop Man"
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Pipecat",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
voice_name="Newslady",
output_format="pcm_44100"
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
voice_name="British Lady",
output_format="pcm_44100"
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
voice_name="Barbershop Man",
output_format="pcm_44100"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function("switch_voice", switch_voice)
tools = [
@@ -109,9 +112,7 @@ async def main():
},
"required": ["voice"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -120,22 +121,21 @@ async def main():
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -146,9 +146,7 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}.",
}
)
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -157,4 +155,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -9,18 +9,22 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantContextAggregator,
LLMUserContextAggregator
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import Model, WhisperSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -29,7 +33,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -38,10 +41,10 @@ logger.add(sys.stderr, level="DEBUG")
current_language = "English"
async def switch_language(function_name, tool_call_id, args, llm, context, result_callback):
async def switch_language(llm, args):
global current_language
current_language = args["language"]
await result_callback({"voice": f"Your answers from now on should be in {current_language}."})
return {"voice": f"Your answers from now on should be in {current_language}."}
async def english_filter(frame) -> bool:
@@ -52,10 +55,8 @@ async def spanish_filter(frame) -> bool:
return current_language == "Spanish"
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -65,23 +66,28 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
vad_audio_passthrough=True
)
)
stt = WhisperSTTService(model=Model.LARGE)
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
english_tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="pNInz6obpgDQGcFmaJgB",
)
spanish_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
spanish_tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
model="eleven_multilingual_v2",
voice_id="9F4C8ztpNUmXkdDDbz3J",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [
@@ -100,9 +106,7 @@ async def main():
},
"required": ["language"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -111,22 +115,21 @@ async def main():
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserContextAggregator(context)
tma_out = LLMAssistantContextAggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -137,9 +140,7 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}.",
}
)
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
@@ -148,4 +149,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -5,38 +5,35 @@
#
import asyncio
import aiohttp
import os
import sys
import json
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -45,15 +42,15 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak",
base_url="http://0.0.0.0:8080/v1/speak"
)
llm = OpenAILLMService(
@@ -62,7 +59,7 @@ async def main():
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1",
base_url="http://0.0.0.0:8000/v1"
)
messages = [
@@ -72,19 +69,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -97,7 +92,8 @@ async def main():
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Handle "latency-ping" messages. The client will send app messages that look like
@@ -114,18 +110,14 @@ async def main():
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
transport.output().send_message(DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}},
participant_id=sender))
# And push to the pipeline for the Daily transport.output to send
await task.queue_frame(
await tma_in.push_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender,
)
)
participant_id=sender))
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")
@@ -134,4 +126,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,113 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_idle, # Idle user check-in
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,76 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import argparse
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
(room_url, _, args) = await configure_with_args(session, parser)
transport = DailyTransport(
room_url,
None,
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
),
)
gst = GStreamerPipelineSource(
pipeline=f"filesrc location={args.input}",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
audio_sample_rate=16000,
audio_channels=1,
),
)
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,67 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"GStreamer",
DailyParams(
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
),
)
gst = GStreamerPipelineSource(
pipeline='videotestsrc ! capsfilter caps="video/x-raw,width=1280,height=720,framerate=30/1"',
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280, video_height=720, clock_sync=False
),
)
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,179 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
}
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello!"}],
# [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}],
# [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": "Say"},
# {"type": "text", "text": "yo what's up!"},
# ],
# }
# ],
tools,
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,236 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
except Exception as e:
await result_callback({"success": False, "error": str(e)})
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
},
{
"type": "function",
"function": {
"name": "save_conversation",
"description": "Save the current conversatione. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,262 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
# async def get_saved_conversation_filenames(
# function_name, tool_call_id, args, llm, context, result_callback
# ):
# pattern = re.compile(re.escape(BASE_FILENAME) + "\\d{8}_\\d{6}\\.json$")
# matching_files = []
# for filename in os.listdir("."):
# if pattern.match(filename):
# matching_files.append(filename)
# await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
async def _reset():
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
await llm.reset_conversation()
await llm._create_response()
except Exception as e:
await result_callback({"success": False, "error": str(e)})
asyncio.create_task(_reset())
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"type": "function",
"name": "save_conversation",
"description": "Save the current conversatione. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"type": "function",
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"type": "function",
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext([], tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,232 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
except Exception as e:
await result_callback({"success": False, "error": str(e)})
# Test message munging ...
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
{"role": "user", "content": ""},
{"role": "assistant", "content": []},
{"role": "user", "content": "Tell me"},
{"role": "user", "content": "a joke"},
]
tools = [
{
"name": "get_current_weather",
"description": "Get the current weather",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "save_conversation",
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
"input_schema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"input_schema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"input_schema": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,29 +1,18 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import os
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
parser.add_argument(
"-k",
"--apikey",
@@ -39,24 +28,31 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
return (url, token, args)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

Some files were not shown because too many files have changed in this diff Show More