Compare commits

..

1 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
81c2c5adfa working on a fastapi http transport 2024-09-17 09:52:19 -07:00
250 changed files with 9529 additions and 14374 deletions

View File

@@ -1,4 +1,4 @@
name: format
name: lint
on:
workflow_dispatch:
@@ -12,12 +12,12 @@ on:
- "docs/**"
concurrency:
group: build-format-${{ github.event.pull_request.number || github.ref }}
group: build-lint-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
ruff-format:
name: "Formatting checker"
autopep8:
name: "Formatting lints"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
@@ -25,7 +25,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: '3.10'
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -34,8 +34,11 @@ jobs:
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt
- name: Ruff formatter
id: ruff
- name: autopep8
id: autopep8
run: |
source .venv/bin/activate
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
autopep8 --max-line-length 100 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1

View File

@@ -20,24 +20,14 @@ jobs:
name: "Unit and Integration Tests"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- uses: actions/checkout@v4
- name: Set up Python
id: setup_python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Cache virtual environment
uses: actions/cache@v3
with:
# We are hashing dev-requirements.txt and test-requirements.txt which
# contain all dependencies needed to run the tests.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
path: .venv
- name: Install system packages
id: install_system_packages
run: |
sudo apt-get install -y portaudio19-dev
run: sudo apt-get install -y portaudio19-dev
- name: Setup virtual environment
run: |
python -m venv .venv
@@ -45,8 +35,8 @@ jobs:
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt -r test-requirements.txt
pip install -r dev-requirements.txt
- name: Test with pytest
run: |
source .venv/bin/activate
pytest --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests

View File

@@ -1,6 +1,6 @@
# Changelog
All notable changes to **Pipecat** will be documented in this file.
All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
@@ -9,208 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `AssemblyAISTTService` and corresponding foundational examples
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
- Added a foundational example for Gladia transcription:
`13c-gladia-transcription.py`
### Fixed
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
from `enable_metrics`.
## [0.0.46] - 2024-10-19
### Added
- Added `audio_passthrough` parameter to `STTService`. If enabled it allows
audio frames to be pushed downstream in case other processors need them.
- Added input parameter options for `PlayHTTTSService` and
`PlayHTHttpTTSService`.
### Changed
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has
been added.
- `PlayHTTTSService` now uses PlayHT websockets instead of HTTP requests.
- The previous `PlayHTTTSService` HTTP implementation is now
`PlayHTHttpTTSService`.
- `PlayHTTTSService` and `PlayHTHttpTTSService` now use a `voice_engine` of
`PlayHT3.0-mini`, which allows for multi-lingual support.
- Renamed `OpenAILLMServiceRealtimeBeta` to `OpenAIRealtimeBetaLLMService` to
match other services.
### Deprecated
- `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator` are
mostly deprecated, use `OpenAILLMContext` instead.
- The `vad` package is now deprecated and `audio.vad` should be used
instead. The `avd` package will get removed in a future release.
### Fixed
- Fixed an issue that would cause an error if no VAD analyzer was passed to
`LiveKitTransport` params.
- Fixed `SileroVAD` processor to support interruptions properly.
### Other
- Added `examples/foundational/07-interruptible-vad.py`. This is the same as
`07-interruptible.py` but using the `SileroVAD` processor instead of passing
the `VADAnalyzer` in the transport.
## [0.0.45] - 2024-10-16
### Changed
- Metrics messages have moved out from the transport's base output into RTVI.
## [0.0.44] - 2024-10-15
### Added
- Added support for OpenAI Realtime API with the new
`OpenAILLMServiceRealtimeBeta` processor.
(see https://platform.openai.com/docs/guides/realtime/overview)
- Added `RTVIBotTranscriptionProcessor` which will send the RTVI
`bot-transcription` protocol message. These are TTS text aggregated (into
sentences) messages.
- Added new input params to the `MarkdownTextFilter` utility. You can set
`filter_code` to filter code from text and `filter_tables` to filter tables
from text.
- Added `CanonicalMetricsService`. This processor uses the new
`AudioBufferProcessor` to capture conversation audio and later send it to
Canonical AI.
(see https://canonical.chat/)
- Added `AudioBufferProcessor`. This processor can be used to buffer mixed user and
bot audio. This can later be saved into an audio file or processed by some
audio analyzer.
- Added `on_first_participant_joined` event to `LiveKitTransport`.
### Changed
- LLM text responses are now logged properly as unicode characters.
- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`,
`BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `BotSpeakingFrame` and
`UserImageRequestFrame` are now based from `SystemFrame`
### Fixed
- Merge `RTVIBotLLMProcessor`/`RTVIBotLLMTextProcessor` and
`RTVIBotTTSProcessor`/`RTVIBotTTSTextProcessor` to avoid out of order issues.
- Fixed an issue in RTVI protocol that could cause a `bot-llm-stopped` or
`bot-tts-stopped` message to be sent before a `bot-llm-text` or `bot-tts-text`
message.
- Fixed `DeepgramSTTService` constructor settings not being merged with default
ones.
- Fixed an issue in Daily transport that would cause tasks to be hanging if
urgent transport messages were being sent from a transport event handler.
- Fixed an issue in `BaseOutputTransport` that would cause `EndFrame` to be
pushed downed too early and call `FrameProcessor.cleanup()` before letting the
transport stop properly.
## [0.0.43] - 2024-10-10
### Added
- Added a new util called `MarkdownTextFilter` which is a subclass of a new
base class called `BaseTextFilter`. This is a configurable utility which
is intended to filter text received by TTS services.
- Added new `RTVIUserLLMTextProcessor`. This processor will send an RTVI
`user-llm-text` message with the user content's that was sent to the LLM.
### Changed
- `TransportMessageFrame` doesn't have an `urgent` field anymore, instead
there's now a `TransportMessageUrgentFrame` which is a `SystemFrame` and
therefore skip all internal queuing.
- For TTS services, convert inputted languages to match each service's language
format
### Fixed
- Fixed an issue where changing a language with the Deepgram STT service
wouldn't apply the change. This was fixed by disconnecting and reconnecting
when the language changes.
## [0.0.42] - 2024-10-02
### Added
- `SentryMetrics` has been added to report frame processor metrics to
Sentry. This is now possible because `FrameProcessorMetrics` can now be passed
to `FrameProcessor`.
- Added Google TTS service and corresponding foundational example
`07n-interruptible-google.py`
- Added AWS Polly TTS support and `07m-interruptible-aws.py` as an example.
- Added InputParams to Azure TTS service.
- Added `LivekitTransport` (audio-only for now).
- RTVI 0.2.0 is now supported.
- All `FrameProcessors` can now register event handlers.
```
tts = SomeTTSService(...)
@tts.event_handler("on_connected"):
async def on_connected(processor):
...
```
- Added `AsyncGeneratorProcessor`. This processor can be used together with a
`FrameSerializer` as an async generator. It provides a `generator()` function
that returns an `AsyncGenerator` and that yields serialized frames.
- Added `EndTaskFrame` and `CancelTaskFrame`. These are new frames that are
meant to be pushed upstream to tell the pipeline task to stop nicely or
immediately respectively.
- Added configurable LLM parameters (e.g., temperature, top_p, max_tokens, seed)
for OpenAI, Anthropic, and Together AI services along with corresponding
setter functions.
- Added `sample_rate` as a constructor parameter for TTS services.
- Pipecat has a pipeline-based architecture. The pipeline consists of frame
processors linked to each other. The elements traveling across the pipeline
are called frames.
To have a deterministic behavior the frames traveling through the pipeline
should always be ordered, except system frames which are out-of-band
frames. To achieve that, each frame processor should only output frames from a
single task.
In this version all the frame processors have their own task to push
frames. That is, when `push_frame()` is called the given frame will be put
into an internal queue (with the exception of system frames) and a frame
processor task will push it out.
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
@@ -218,12 +19,6 @@ async def on_connected(processor):
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added `CartesiaHttpTTSService`.
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
@@ -245,37 +40,6 @@ async def on_connected(processor):
### Changed
- Context frames are now pushed downstream from assistant context aggregators.
- Removed Silero VAD torch dependency.
- Updated individual update settings frame classes into a single
`ServiceUpdateSettingsFrame` class.
- We now distinguish between input and output audio and image frames. We
introduce `InputAudioRawFrame`, `OutputAudioRawFrame`, `InputImageRawFrame`
and `OutputImageRawFrame` (and other subclasses of those). The input frames
usually come from an input transport and are meant to be processed inside the
pipeline to generate new frames. However, the input frames will not be sent
through an output transport. The output frames can also be processed by any
frame processor in the pipeline and they are allowed to be sent by the output
transport.
- `ParallelTask` has been renamed to `SyncParallelPipeline`. A
`SyncParallelPipeline` is a frame processor that contains a list of different
pipelines to be executed concurrently. The difference between a
`SyncParallelPipeline` and a `ParallelPipeline` is that, given an input frame,
the `SyncParallelPipeline` will wait for all the internal pipelines to
complete. This is achieved by making sure the last processor in each of the
pipelines is synchronous (e.g. an HTTP-based service that waits for the
response).
- `StartFrame` is back a system frame to make sure it's processed immediately by
all processors. `EndFrame` stays a control frame since it needs to be ordered
allowing the frames in the pipeline to be processed.
- Updated `MoondreamService` revision to `2024-08-26`.
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
@@ -296,15 +60,6 @@ async def on_connected(processor):
### Fixed
- Fixed OpenAI multiple function calls.
- Fixed a Cartesia TTS issue that would cause audio to be truncated in some
cases.
- Fixed a `BaseOutputTransport` issue that would stop audio and video rendering
tasks (after receiving and `EndFrame`) before the internal queue was emptied,
causing the pipeline to finish prematurely.
- `StartFrame` should be the first frame every processor receives to avoid
situations where things are not initialized (because initialization happens on
`StartFrame`) and other frames come in resulting in undesired behavior.
@@ -314,10 +69,6 @@ async def on_connected(processor):
- `obj_id()` and `obj_count()` now use `itertools.count` avoiding the need of
`threading.Lock`.
### Other
- Pipecat now uses Ruff as its formatter (https://github.com/astral-sh/ruff).
## [0.0.41] - 2024-08-22
### Added
@@ -542,7 +293,7 @@ async def on_connected(processor):
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
`DeepgramSTTService` need to process things asynchronously. For example, audio
is sent to Deepgram but transcriptions are not returned immediately. In these
cases we still require all frames (except system frames) to be pushed
@@ -559,7 +310,7 @@ async def on_connected(processor):
- `WhisperSTTService` model can now also be a string.
- Added missing \* keyword separators in services.
- Added missing * keyword separators in services.
### Fixed
@@ -636,7 +387,7 @@ async def on_connected(processor):
- Added new `TwilioFrameSerializer`. This is a new serializer that knows how to
serialize and deserialize audio frames from Twilio.
- Added Daily transport event: `on_dialout_answered`. See
- Added Daily transport event: `on_dialout_answered`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
- Added new `AzureSTTService`. This allows you to use Azure Speech-To-Text.
@@ -876,7 +627,7 @@ async def on_connected(processor):
- Added Daily transport support for dial-in use cases.
- Added Daily transport events: `on_dialout_connected`, `on_dialout_stopped`,
`on_dialout_error` and `on_dialout_warning`. See
`on_dialout_error` and `on_dialout_warning`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
## [0.0.21] - 2024-05-22

View File

@@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `assemblyai`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples
@@ -51,7 +51,10 @@ Your project may or may not need these, so they're made available as optional re
Here is a very basic Pipecat bot that greets a user when they join a real-time session. We'll use [Daily](https://daily.co) for real-time media transport, and [Cartesia](https://cartesia.ai/) for text-to-speech.
```python
#app.py
import asyncio
import aiohttp
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -61,43 +64,39 @@ from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
async def main():
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
async with aiohttp.ClientSession() as session:
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
# Use Cartesia for Text-to-Speech
tts = CartesiaTTSService(
api_key=...,
voice_id=...
)
# Use Cartesia for Text-to-Speech
tts = CartesiaTTSService(
api_key=...,
voice_id=...
)
# Simple pipeline that will process text to speech and output the result
pipeline = Pipeline([tts, transport.output()])
# Simple pipeline that will process text to speech and output the result
pipeline = Pipeline([tts, transport.output()])
# Create Pipecat processor that can run one or more pipelines tasks
runner = PipelineRunner()
# Create Pipecat processor that can run one or more pipelines tasks
runner = PipelineRunner()
# Assign the task callable to run the pipeline
task = PipelineTask(pipeline)
# Assign the task callable to run the pipeline
task = PipelineTask(pipeline)
# Register an event handler to play audio when a
# participant joins the transport WebRTC session
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
# Queue a TextFrame that will get spoken by the TTS service (Cartesia)
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to play audio when a
# participant joins the transport WebRTC session
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
# Queue a TextFrame that will get spoken by the TTS service (Cartesia)
await task.queue_frames([TextFrame(f"Hello there, {participant_name}!"), EndFrame()])
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Run the pipeline task
await runner.run(task)
# Run the pipeline task
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
@@ -111,6 +110,7 @@ python app.py
Daily provides a prebuilt WebRTC user interface. Whilst the app is running, you can visit at `https://<yourdomain>.daily.co/<room_url>` and listen to the bot say hello!
## WebRTC for production use
WebSockets are fine for server-to-server communication or for initial development. But for production use, youll need client-server audio to use a protocol designed for real-time media transport. (For an explanation of the difference between WebSockets and WebRTC, see [this post.](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/#webrtc))
@@ -129,6 +129,9 @@ Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer.
pip install pipecat-ai[silero]
```
The first time your run your bot with Silero, startup may take a while whilst it downloads and caches the model in the background. You can check the progress of this in the console.
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -162,29 +165,27 @@ pip install "path_to_this_repo[option,...]"
From the root directory, run:
```shell
pytest --doctest-modules --ignore-glob="*to_be_updated*" --ignore-glob=*pipeline_source* src tests
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests
```
## Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting.
### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
You can use [use-package](https://github.com/jwiegley/use-package) to install [py-autopep8](https://codeberg.org/ideasman42/emacs-py-autopep8) package and configure `autopep8` arguments:
```elisp
(use-package lazy-ruff
(use-package py-autopep8
:ensure t
:hook ((python-mode . lazy-ruff-mode))
:defer t
:hook ((python-mode . py-autopep8-mode))
:config
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
(setq py-autopep8-options '("-a" "-a", "--max-line-length=100")))
```
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
`autopep8` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
```elisp
(use-package pyvenv-auto
@@ -197,14 +198,18 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
[autopep8](https://marketplace.visualstudio.com/items?itemName=ms-python.autopep8) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `autopep8` arguments:
```json
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true
},
"ruff.format.args": ["--config", "line-length=100"]
"autopep8.args": [
"-a",
"-a",
"--max-line-length=100"
],
```
## Getting help

View File

@@ -1,8 +1,8 @@
autopep8~=2.3.1
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.376
pytest~=8.3.2
ruff~=0.6.7
setuptools~=72.2.0
setuptools_scm~=8.1.0

View File

@@ -1,11 +1,6 @@
# Anthropic
ANTHROPIC_API_KEY=...
# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...
# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...

View File

@@ -1,161 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
recordings/
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml

View File

@@ -1,10 +0,0 @@
FROM python:3.10-bullseye
RUN mkdir /app
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -1,37 +0,0 @@
# Simple Chatbot
<img src="image.png" width="420px">
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

View File

@@ -1,146 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import uuid
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.canonical import CanonicalMetricsService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
audio_in_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your responses to 12 words or fewer.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
"""
CanonicalMetrics uses AudioBufferProcessor under the hood to buffer the audio. On
call completion, CanonicalMetrics will send the audio buffer to Canonical for
analysis. Visit https://voice.canonical.chat to learn more.
"""
audio_buffer_processor = AudioBufferProcessor()
canonical = CanonicalMetricsService(
audio_buffer_processor=audio_buffer_processor,
aiohttp_session=session,
api_key=os.getenv("CANONICAL_API_KEY"),
api_url=os.getenv("CANONICAL_API_URL"),
call_id=str(uuid.uuid4()),
assistant="pipecat-chatbot",
assistant_speaks_first=True,
)
pipeline = Pipeline(
[
transport.input(), # microphone
context_aggregator.user(),
llm,
tts,
transport.output(),
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,6 +0,0 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...
CANONICAL_API_KEY=can...
CANONICAL_API_URL=

View File

@@ -1,5 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero,elevenlabs,canonical]

View File

@@ -1,56 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -1,139 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -1,161 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml

View File

@@ -1,15 +0,0 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -1,37 +0,0 @@
# Simple Chatbot
<img src="image.png" width="420px">
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

View File

@@ -1,141 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
import datetime
import wave
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def save_audio(audiobuffer):
if audiobuffer.has_audio():
merged_audio = audiobuffer.merge_audio_buffers()
filename = f"conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
with wave.open(filename, "wb") as wf:
wf.setnchannels(2)
wf.setsampwidth(2)
wf.setframerate(audiobuffer._sample_rate)
wf.writeframes(merged_audio)
print(f"Merged audio saved to {filename}")
else:
print("No audio data to save")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
audio_in_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
#
# Spanish
#
# transcription_settings=DailyTranscriptionSettings(
# language="es",
# tier="nova",
# model="2-general"
# )
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
# model="eleven_multilingual_v2",
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
#
# English
#
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. Keep all your response to 12 words or fewer.",
#
# Spanish
#
# "content": "Eres Chatbot, un amigable y útil robot. Tu objetivo es demostrar tus capacidades de una manera breve. Tus respuestas se convertiran a audio así que nunca no debes incluir caracteres especiales. Contesta a lo que el usuario pregunte de una manera creativa, útil y breve. Empieza por presentarte a ti mismo.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # microphone
context_aggregator.user(),
llm,
tts,
transport.output(),
audiobuffer, # used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
await save_audio(audiobuffer)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,4 +0,0 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...

View File

@@ -1,4 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero,elevenlabs]

View File

@@ -1,56 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -1,139 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -34,6 +34,6 @@ Note: you can do this manually via the fly.io dashboard under the "secrets" sub-
Send a post request to your running fly.io instance:
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/'`
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

View File

@@ -3,20 +3,19 @@ import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -40,7 +39,7 @@ async def main(room_url: str, token: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
)
tts = ElevenLabsTTSService(
@@ -48,7 +47,9 @@ async def main(room_url: str, token: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -57,19 +58,17 @@ async def main(room_url: str, token: str):
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -16,14 +16,9 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomParams,
)
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -31,37 +26,37 @@ load_dotenv(override=True)
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -69,7 +64,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
@@ -78,15 +73,13 @@ app.add_middleware(
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]["config"]["image"]
image = data[0]['config']['image']
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
@@ -95,28 +88,31 @@ async def spawn_fly_machine(room_url: str, token: str):
"config": {
"image": image,
"auto_destroy": True,
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},
}
# Spawn a new machine instance
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data["id"]
vm_id = data['id']
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
@@ -124,7 +120,7 @@ async def spawn_fly_machine(room_url: str, token: str):
print(f"Machine joined room: {room_url}")
@app.post("/")
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
@@ -138,23 +134,29 @@ async def start_bot(request: Request) -> JSONResponse:
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(properties=DailyRoomProperties())
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
@@ -165,26 +167,24 @@ async def start_bot(request: Request) -> JSONResponse:
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
@@ -193,19 +193,23 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload code on change"
)
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -3,20 +3,21 @@ import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyDialinSettings
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -30,7 +31,10 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
)
transport = DailyTransport(
room_url,
@@ -46,7 +50,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
)
tts = ElevenLabsTTSService(
@@ -54,7 +58,10 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
messages = [
{
@@ -63,19 +70,17 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -7,6 +7,7 @@ provisioning a room and starting a Pipecat bot in response.
Refer to README for more information.
"""
import aiohttp
import os
import argparse
@@ -24,18 +25,17 @@ from pipecat.transports.services.helpers.daily_rest import (
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams,
)
DailyRoomParams)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ["OPENAI_API_KEY", "DAILY_API_KEY", "ELEVENLABS_API_KEY", "ELEVENLABS_VOICE_ID"]
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']
daily_helpers = {}
@@ -47,13 +47,12 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -61,7 +60,7 @@ app.add_middleware(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_headers=["*"]
)
"""
@@ -81,7 +80,10 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
properties=DailyRoomProperties(
# Note: these are the default values, except for the display name
sip=DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
display_name="dialin-user",
video=False,
sip_mode="dial-in",
num_endpoints=1
)
)
)
@@ -95,7 +97,8 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
@@ -103,21 +106,28 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get room or token token")
raise HTTPException(
status_code=500, detail=f"Failed to get room or token token")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}"
bot_proc = f"python3 - m bot_daily - u {room.url} - t {token} - i {
callId} - d {callDomain}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
bot_proc = f"python3 - m bot_twilio - u {room.url} - t {
token} - i {callId} - s {room.config.sip_endpoint}"
try:
subprocess.Popen(
[bot_proc], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))
[bot_proc],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return room
@@ -140,10 +150,11 @@ async def twilio_start_bot(request: Request):
pass
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
callId = data.get("CallSid")
callId = data.get('CallSid')
if not callId:
raise HTTPException(status_code=500, detail="Missing 'CallSid' in request")
raise HTTPException(
status_code=500, detail="Missing 'CallSid' in request")
print("CallId: %s" % callId)
@@ -159,8 +170,7 @@ async def twilio_start_bot(request: Request):
# http://com.twilio.music.classical.s3.amazonaws.com/BusyStrings.mp3
resp = VoiceResponse()
resp.play(
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10
)
url="http://com.twilio.sounds.music.s3.amazonaws.com/MARKOVICHAMP-Borghestral.mp3", loop=10)
return str(resp)
@@ -182,14 +192,18 @@ async def daily_start_bot(request: Request) -> JSONResponse:
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
except Exception:
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
raise HTTPException(
status_code=500,
detail="Missing properties 'callId' or 'callDomain'")
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
return JSONResponse({
"room_url": room.url,
"sipUri": room.config.sip_endpoint
})
# ----------------- Main ----------------- #
@@ -201,18 +215,24 @@ if __name__ == "__main__":
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true", default=True, help="Reload code on change")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -3,30 +3,32 @@ import os
import sys
import argparse
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import (
LLMMessagesFrame,
EndFrame
)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from twilio.rest import Client
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
twilio_account_sid = os.getenv("TWILIO_ACCOUNT_SID")
twilio_auth_token = os.getenv("TWILIO_AUTH_TOKEN")
twilio_account_sid = os.getenv('TWILIO_ACCOUNT_SID')
twilio_auth_token = os.getenv('TWILIO_AUTH_TOKEN')
twilioclient = Client(twilio_account_sid, twilio_auth_token)
daily_api_key = os.getenv("DAILY_API_KEY", "")
@@ -49,7 +51,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
)
tts = ElevenLabsTTSService(
@@ -57,7 +59,10 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
messages = [
{
@@ -66,19 +71,17 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -100,7 +103,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f"<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>"
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")

View File

@@ -1,4 +1,4 @@
pipecat-ai[daily,elevenlabs,openai,silero]
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
python-dotenv

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -21,7 +21,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -33,10 +32,9 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -47,18 +45,12 @@ async def main():
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
@transport.event_handler("on_participant_joined")
async def on_new_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -20,7 +20,6 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)

View File

@@ -1,111 +0,0 @@
import argparse
import asyncio
import os
import sys
import aiohttp
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport
from livekit import api
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
async def configure_livekit():
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token(room_name, "Say One Thing", api_key, api_secret)
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name)
async def main():
async with aiohttp.ClientSession() as session:
(url, token, room_name) = await configure_livekit()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TextFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -22,7 +22,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,22 +33,25 @@ async def main():
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)
room_url,
None,
"Say One Thing From an LLM",
DailyParams(audio_out_enabled=True))
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}
]
}]
runner = PipelineRunner()
@@ -59,10 +61,6 @@ async def main():
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await runner.run(task)

View File

@@ -9,7 +9,7 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -21,7 +21,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -36,11 +35,17 @@ async def main():
room_url,
None,
"Show a still frame image",
DailyParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
DailyParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024
)
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -51,11 +56,11 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# Note that we do not put an EndFrame() item in the pipeline for this demo.
# This means that the bot will stay in the channel until it times out.
# An EndFrame() in the pipeline would cause the transport to shut
# down.
await task.queue_frames([TextFrame("a cat in the style of picasso")])
await runner.run(task)

View File

@@ -22,7 +22,6 @@ from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -36,11 +35,15 @@ async def main():
transport = TkLocalTransport(
tk_root,
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
TransportParams(
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024))
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)

View File

@@ -4,10 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
#
# This example broken on latest pipecat and needs updating.
#
import aiohttp
import asyncio
import os
@@ -28,7 +24,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -59,7 +54,8 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [{"role": "system", "content": "tell the user a joke about llamas"}]
messages = [{"role": "system",
"content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM
# output to audio frames. This task will run in parallel with generating
@@ -77,7 +73,8 @@ async def main():
]
)
merge_pipeline = SequentialMergePipeline([simple_tts_pipeline, llm_pipeline])
merge_pipeline = SequentialMergePipeline(
[simple_tts_pipeline, llm_pipeline])
await asyncio.gather(
transport.run(merge_pipeline),

View File

@@ -14,18 +14,21 @@ from dataclasses import dataclass
from pipecat.frames.frames import (
AppFrame,
Frame,
ImageRawFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
TextFrame
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.parallel_task import ParallelTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -34,7 +37,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -82,46 +84,47 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
),
camera_out_height=1024
)
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(frame, ImageRawFrame),
gate_close_fn=lambda frame: isinstance(frame, LLMFullResponseStartFrame),
start_open=False
)
sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()
# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen], # Generate image
),
transport.output(), # Transport output
]
)
pipeline = Pipeline([
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
ParallelTask( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[llm_full_response_aggregator, imagegen] # Aggregate full LLM response
),
gated_aggregator, # Queues everything until an image is available
transport.output() # Transport output
])
frames = []
for month in [

View File

@@ -11,25 +11,18 @@ import sys
import tkinter as tk
from pipecat.frames.frames import (
Frame,
OutputAudioRawFrame,
TTSAudioRawFrame,
URLImageRawFrame,
LLMMessagesFrame,
TextFrame,
)
from pipecat.frames.frames import AudioRawFrame, Frame, URLImageRawFrame, LLMMessagesFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
from pipecat.transports.local.tk import TkLocalTransport
from loguru import logger
@@ -49,12 +42,7 @@ async def main():
runner = PipelineRunner()
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
messages = [{"role": "system", "content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.", }]
class ImageDescription(FrameProcessor):
def __init__(self):
@@ -72,17 +60,14 @@ async def main():
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
if isinstance(frame, AudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
await self.push_frame(frame, direction)
self.frame = AudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels)
class ImageGrabber(FrameProcessor):
def __init__(self):
@@ -94,22 +79,23 @@ async def main():
if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
params=FalImageGenService.InputParams(
image_size="square_hd"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
key=os.getenv("FAL_KEY"))
sentence_aggregator = SentenceAggregator()
aggregator = LLMFullResponseAggregator()
description = ImageDescription()
@@ -117,27 +103,13 @@ async def main():
image_grabber = ImageGrabber()
# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
pipeline = Pipeline([
llm,
aggregator,
description,
ParallelPipeline([tts, audio_grabber],
[imagegen, image_grabber])
])
task = PipelineTask(pipeline)
await task.queue_frame(LLMMessagesFrame(messages))
@@ -158,9 +130,7 @@ async def main():
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
),
)
camera_out_height=1024))
pipeline = Pipeline([transport.output()])

View File

@@ -9,29 +9,25 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -41,20 +37,8 @@ logger.add(sys.stderr, level="DEBUG")
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {
tokens.prompt_tokens}, characters: {
tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
await self.push_frame(frame, direction)
@@ -70,8 +54,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -79,7 +63,10 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
ml = MetricsLogger()
@@ -89,29 +76,32 @@ async def main():
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
ml,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
ml,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=False,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -11,16 +11,19 @@ import sys
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, OutputImageRawFrame, SystemFrame, TextFrame
from pipecat.frames.frames import ImageRawFrame, Frame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams
from runner import configure
@@ -28,7 +31,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -50,21 +52,9 @@ class ImageSyncAggregator(FrameProcessor):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format,
)
)
await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format))
await self.push_frame(frame)
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format,
)
)
await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format))
else:
await self.push_frame(frame)
@@ -85,15 +75,17 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
)
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -102,31 +94,29 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline(
[
transport.input(),
image_sync_aggregator,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
image_sync_aggregator,
tma_in,
llm,
tts,
transport.output(),
tma_out
])
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
participant_name = participant["info"]["userName"] or ''
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.audio.vad.silero import SileroVAD
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
),
)
vad = SileroVAD()
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
vad,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,22 +9,22 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +43,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -52,7 +52,9 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -61,35 +63,31 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,24 +5,26 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -41,8 +43,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -51,8 +53,8 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-opus-20240229"
)
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-opus-20240229")
# todo: think more about how to handle system prompts in a more general way. OpenAI,
# Google, and Anthropic all have slightly different approaches to providing a system
@@ -64,19 +66,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -10,18 +10,16 @@ import sys
import aiohttp
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
@@ -34,7 +32,6 @@ from loguru import logger
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -73,22 +70,19 @@ async def main():
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
("system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
])
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input",
)
input_messages_key="input")
lc = LangchainProcessor(history_chain)
tma_in = LLMUserResponseAggregator()
@@ -96,12 +90,12 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
transport.input(), # Transport user input
tma_in, # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
@@ -115,7 +109,11 @@ async def main():
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [({"content": "Please briefly introduce yourself to the user."})]
messages = [(
{
"content": "Please briefly introduce yourself to the user."
}
)]
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,24 +5,26 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -41,15 +43,21 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -58,27 +66,27 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,24 +5,26 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -41,8 +43,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
@@ -50,7 +52,9 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -59,35 +63,31 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -4,26 +4,27 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.playht import PlayHTTTSService
from pipecat.transcriptions.language import Language
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,18 +44,19 @@ async def main():
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
params=PlayHTTTSService.InputParams(language=Language.EN),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -63,35 +65,26 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,14 +9,15 @@ import asyncio
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.azure import AzureLLMService, AzureSTTService, AzureTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -24,7 +25,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +45,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
)
stt = AzureSTTService(
@@ -71,20 +71,18 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -92,7 +90,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -4,24 +4,27 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.openai import OpenAITTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -41,13 +44,18 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="alloy")
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -56,19 +64,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -76,7 +82,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,15 +9,18 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openpipe import OpenPipeLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
@@ -25,7 +28,6 @@ from loguru import logger
import time
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -58,7 +60,9 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
model="gpt-4o",
tags={"conversation_id": f"pipecat-{timestamp}"},
tags={
"conversation_id": f"pipecat-{timestamp}"
}
)
messages = [
@@ -67,20 +71,17 @@ async def main():
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@@ -88,7 +89,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,22 +9,23 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,17 +45,19 @@ async def main():
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000",
base_url="http://localhost:8000"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -63,19 +66,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -83,7 +84,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,23 +9,23 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,7 +45,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
)
stt = GladiaSTTService(
@@ -57,7 +57,9 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -66,20 +68,18 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -87,7 +87,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,22 +9,22 @@ import asyncio
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.lmnt import LmntTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,13 +44,18 @@ async def main():
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = LmntTTSService(api_key=os.getenv("LMNT_API_KEY"), voice_id="morgan")
tts = LmntTTSService(
api_key=os.getenv("LMNT_API_KEY"),
voice_id="morgan"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -59,19 +64,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User respones
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -79,7 +82,8 @@ async def main():
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -1,109 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.ai_services import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
params=TogetherLLMService.InputParams(
temperature=1.0,
top_p=0.9,
top_k=40,
extra={
"frequency_penalty": 2.0,
"presence_penalty": 0.0,
},
),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond in plain language. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
user_aggregator = context_aggregator.user()
assistant_aggregator = context_aggregator.assistant()
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,99 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws import AWSTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AWSTTSService(
api_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region=os.getenv("AWS_REGION"),
voice_id="Amy",
params=AWSTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,96 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
voice_id="en-US-Neural2-J",
params=GoogleTTSService.InputParams(language="en-US", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User respones
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,97 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -3,19 +3,18 @@ import aiohttp
import asyncio
import logging
import os
from pipecat.processors.aggregators import SentenceAggregator
from pipecat.pipeline.aggregators import SentenceAggregator
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.services.daily import DailyTransport
from pipecat.services.azure import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from pipecat.transports.daily_transport import DailyTransport
from pipecat.services.azure_ai_services import AzureLLMService, AzureTTSService
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.services.fal_ai_services import FalImageGenService
from pipecat.pipeline.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
@@ -54,7 +53,9 @@ async def main():
voice_id="jBpfuIE2acCO8z3wKNLl",
)
dalle = FalImageGenService(
params=FalImageGenService.InputParams(image_size="1024x1024"),
params=FalImageGenService.InputParams(
image_size="1024x1024"
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
@@ -74,11 +75,13 @@ async def main():
async def get_text_and_audio(messages) -> Tuple[str, bytearray]:
"""This function streams text from the LLM and uses the TTS service to convert
that text to speech as it's received."""
that text to speech as it's received. """
source_queue = asyncio.Queue()
sink_queue = asyncio.Queue()
sentence_aggregator = SentenceAggregator()
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
pipeline = Pipeline(
[llm, sentence_aggregator, tts1], source_queue, sink_queue
)
await source_queue.put(LLMMessagesFrame(messages))
await source_queue.put(EndFrame())

View File

@@ -8,17 +8,9 @@ import aiohttp
import asyncio
import sys
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyTransport, DailyParams
from runner import configure
@@ -26,56 +18,33 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Test",
room_url, token, "Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720,
),
camera_out_height=720
)
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])
pipeline = Pipeline([transport.input(), transport.output()])
runner = PipelineRunner()

View File

@@ -10,17 +10,9 @@ import sys
import tkinter as tk
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -30,33 +22,12 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
@@ -65,8 +36,8 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
room_url, token, "Test", DailyParams(
audio_in_enabled=True))
tk_transport = TkLocalTransport(
tk_root,
@@ -75,15 +46,13 @@ async def main():
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720,
),
)
camera_out_height=720))
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
task = PipelineTask(pipeline)

View File

@@ -9,22 +9,22 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -43,8 +43,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -52,7 +52,9 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -62,21 +64,18 @@ async def main():
]
hey_robot_filter = WakeCheckFilter(["hey robot", "hey, robot"])
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
hey_robot_filter, # Filter out speech not directed at the robot
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

View File

@@ -10,29 +10,31 @@ import os
import sys
import wave
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
LLMFullResponseEndFrame,
LLMMessagesFrame,
OutputAudioRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMUserResponseAggregator,
LLMAssistantResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -51,12 +53,12 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the image and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
class OutboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -69,6 +71,7 @@ class OutboundSoundEffectWrapper(FrameProcessor):
class InboundSoundEffectWrapper(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -92,15 +95,17 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV",
)
messages = [
@@ -110,27 +115,25 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
out_sound = OutboundSoundEffectWrapper()
in_sound = InboundSoundEffectWrapper()
fl = FrameLogger("LLM Out")
fl2 = FrameLogger("Transcription In")
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
tma_in,
in_sound,
fl2,
llm,
fl,
tts,
out_sound,
transport.output(),
tma_out
])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -20,13 +19,13 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.moondream import MoondreamService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,9 +45,7 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
@@ -63,8 +61,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -88,17 +86,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
moondream,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -106,6 +102,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -20,13 +19,13 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,9 +45,7 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
@@ -64,8 +62,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -75,8 +73,8 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
model="gemini-1.5-flash-latest",
api_key=os.getenv("GOOGLE_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -90,17 +88,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
google,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -108,6 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -20,13 +19,13 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,9 +45,7 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
@@ -63,8 +61,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -73,7 +71,10 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
openai = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -87,17 +88,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
openai,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -105,6 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -20,13 +19,13 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -34,6 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -45,9 +45,7 @@ class UserImageRequester(FrameProcessor):
await super().process_frame(frame, direction)
if self._participant_id and isinstance(frame, TextFrame):
await self.push_frame(
UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self._participant_id), FrameDirection.UPSTREAM)
await self.push_frame(frame, direction)
@@ -63,8 +61,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
user_response = UserResponseAggregator()
@@ -73,14 +71,14 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
anthropic = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
anthropic = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY")
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
sample_rate=16000,
)
@transport.event_handler("on_first_participant_joined")
@@ -90,17 +88,15 @@ async def main():
transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(
[
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output(),
]
)
pipeline = Pipeline([
transport.input(),
user_response,
image_requester,
vision_aggregator,
anthropic,
tts,
transport.output()
])
task = PipelineTask(pipeline)
@@ -108,6 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -21,7 +21,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -29,6 +28,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -40,9 +40,8 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
stt = WhisperSTTService()

View File

@@ -19,7 +19,6 @@ from pipecat.transports.local.audio import LocalAudioTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -27,6 +26,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

View File

@@ -14,7 +14,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.deepgram import DeepgramSTTService, LiveOptions, Language
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -22,7 +22,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -30,6 +29,7 @@ logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -41,14 +41,10 @@ async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
# live_options=LiveOptions(language=Language.FR),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tl = TranscriptionLogger()

View File

@@ -1,63 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gladia import GladiaSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
# live_options=LiveOptions(language=Language.FR),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,62 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,13 +9,15 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.logger import FrameLogger
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -24,7 +26,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -32,12 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
await llm.push_frame(TextFrame("Let me check on that."))
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
@@ -56,8 +52,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -65,10 +61,18 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
llm.register_function(
None,
fetch_weather_from_api,
start_callback=start_fetch_weather)
fl_in = FrameLogger("Inner")
fl_out = FrameLogger("Outer")
tools = [
ChatCompletionToolParam(
@@ -85,15 +89,17 @@ async def main():
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"enum": [
"celsius",
"fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
"required": [
"location",
"format"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -104,37 +110,28 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
fl_in,
transport.input(),
context_aggregator.user(),
llm,
fl_out,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,136 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
# await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,167 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
),
ChatCompletionToolParam(
type="function",
function={
"name": "get_image",
"description": "Get an image from the video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to ask the AI to generate an image of",
},
},
"required": ["question"],
},
},
),
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
messages = [
{"role": "system", "content": system_prompt},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,6 @@ import asyncio
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
@@ -20,6 +19,7 @@ from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -28,7 +28,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -40,11 +39,7 @@ current_voice = "News Lady"
async def switch_voice(function_name, tool_call_id, args, llm, context, result_callback):
global current_voice
current_voice = args["voice"]
await result_callback(
{
"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."
}
)
await result_callback({"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."})
async def news_lady_filter(frame) -> bool:
@@ -71,8 +66,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
news_lady = CartesiaTTSService(
@@ -90,7 +85,9 @@ async def main():
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function("switch_voice", switch_voice)
tools = [
@@ -109,9 +106,7 @@ async def main():
},
"required": ["voice"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -122,20 +117,18 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[FunctionFilter(british_lady_filter), british_lady], # British Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -146,9 +139,7 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}.",
}
)
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
@@ -21,6 +20,7 @@ from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import Model, WhisperSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from openai.types.chat import ChatCompletionToolParam
@@ -29,7 +29,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -65,8 +64,8 @@ async def main():
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
vad_audio_passthrough=True
)
)
stt = WhisperSTTService(model=Model.LARGE)
@@ -81,7 +80,9 @@ async def main():
voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [
@@ -100,9 +101,7 @@ async def main():
},
"required": ["language"],
},
},
)
]
})]
messages = [
{
"role": "system",
@@ -113,20 +112,18 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
transport.output(), # Transport bot output
context_aggregator.assistant() # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@@ -137,9 +134,7 @@ async def main():
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}.",
}
)
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -5,28 +5,26 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
DailyTransportMessageFrame,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport, DailyTransportMessageFrame
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -45,15 +43,15 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak",
base_url="http://0.0.0.0:8080/v1/speak"
)
llm = OpenAILLMService(
@@ -62,7 +60,7 @@ async def main():
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1",
base_url="http://0.0.0.0:8000/v1"
)
messages = [
@@ -72,19 +70,17 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@@ -97,7 +93,8 @@ async def main():
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Handle "latency-ping" messages. The client will send app messages that look like
@@ -114,18 +111,14 @@ async def main():
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
transport.output().send_message(DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}},
participant_id=sender))
# And push to the pipeline for the Daily transport.output to send
await task.queue_frame(
await tma_in.push_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender,
)
)
participant_id=sender))
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")

View File

@@ -9,23 +9,23 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -44,8 +44,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -53,7 +53,9 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
@@ -62,46 +64,38 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_idle, # Idle user check-in
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()

View File

@@ -20,7 +20,6 @@ from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -30,7 +29,12 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument("-i", "--input", type=str, required=True, help="Input video file")
parser.add_argument(
"-i",
"--input",
type=str,
required=True,
help="Input video file")
(room_url, _, args) = await configure_with_args(session, parser)
@@ -45,7 +49,7 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
),
)
)
gst = GStreamerPipelineSource(
@@ -55,15 +59,13 @@ async def main():
video_height=720,
audio_sample_rate=16000,
audio_channels=1,
),
)
)
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
task = PipelineTask(pipeline)

View File

@@ -19,7 +19,6 @@ from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -39,22 +38,20 @@ async def main():
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
),
)
)
gst = GStreamerPipelineSource(
pipeline='videotestsrc ! capsfilter caps="video/x-raw,width=1280,height=720,framerate=30/1"',
pipeline="videotestsrc ! capsfilter caps=\"video/x-raw,width=1280,height=720,framerate=30/1\"",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280, video_height=720, clock_sync=False
),
)
video_width=1280,
video_height=720,
clock_sync=False))
pipeline = Pipeline(
[
gst, # GStreamer file source
transport.output(), # Transport bot output
]
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
task = PipelineTask(pipeline)

View File

@@ -1,179 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
}
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello!"}],
# [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}],
# [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": "Say"},
# {"type": "text", "text": "yo what's up!"},
# ],
# }
# ],
tools,
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -17,13 +16,13 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -47,8 +46,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -57,7 +56,8 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620"
)
llm.register_function("get_weather", get_weather)
@@ -90,20 +90,18 @@ async def main():
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.

View File

@@ -9,7 +9,6 @@ import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -17,13 +16,13 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -56,8 +55,8 @@ async def main():
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -68,7 +67,7 @@ async def main():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
enable_prompt_caching_beta=True,
enable_prompt_caching_beta=True
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
@@ -101,7 +100,7 @@ async def main():
},
"required": ["question"],
},
},
}
]
# todo: test with very short initial user message
@@ -135,28 +134,28 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
"type": "text",
"text": system_prompt,
}
],
]
},
{"role": "user", "content": "Start the conversation by introducing yourself."},
]
{
"role": "user",
"content": "Start the conversation by introducing yourself."
}]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]

View File

@@ -0,0 +1,138 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
import json
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.together import TogetherLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def get_current_weather(
function_name,
tool_call_id,
arguments,
llm,
context,
result_callback):
logger.debug("IN get_current_weather")
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model=os.getenv("TOGETHER_MODEL"),
)
llm.register_function("get_current_weather", get_current_weather)
weatherTool = {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
"required": ["location"],
},
}
system_prompt = f"""\
You have access to the following functions:
Use the function '{weatherTool["name"]}' to '{weatherTool["description"]}':
{json.dumps(weatherTool)}
If you choose to call a function ONLY reply in the following format with no prefix or suffix:
<function=example_function_name>{{\"example_name\": \"example_value\"}}</function>
Reminder:
- Function calls MUST follow the specified format, start with <function= and end with </function>
- Required parameters MUST be specified
- Only call one function at a time
- Put the entire function call reply on one line
- If there is no function call available, answer the question like normal with your current knowledge and do not tell the user about function calls
"""
messages = [{"role": "system",
"content": system_prompt},
{"role": "user",
"content": "Wait for the user to say something."}]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline([
transport.input(), # Transport user input
context_aggregator.user(), # User speech to text
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,236 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
except Exception as e:
await result_callback({"success": False, "error": str(e)})
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
},
{
"type": "function",
"function": {
"name": "save_conversation",
"description": "Save the current conversatione. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,262 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
# async def get_saved_conversation_filenames(
# function_name, tool_call_id, args, llm, context, result_callback
# ):
# pattern = re.compile(re.escape(BASE_FILENAME) + "\\d{8}_\\d{6}\\.json$")
# matching_files = []
# for filename in os.listdir("."):
# if pattern.match(filename):
# matching_files.append(filename)
# await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
async def _reset():
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
await llm.reset_conversation()
await llm._create_response()
except Exception as e:
await result_callback({"success": False, "error": str(e)})
asyncio.create_task(_reset())
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"type": "function",
"name": "save_conversation",
"description": "Save the current conversatione. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"type": "function",
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"type": "function",
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext([], tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,232 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(f"writing conversation to {filename}\n{json.dumps(context.messages, indent=4)}")
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = context.get_messages_for_persistent_storage()
# remove the last message, which is the instruction we just gave to save the conversation
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
logger.debug(
f"loaded conversation from {filename}\n{json.dumps(context.messages, indent=4)}"
)
await tts.say("Ok, I've loaded that conversation.")
except Exception as e:
await result_callback({"success": False, "error": str(e)})
# Test message munging ...
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
{"role": "user", "content": ""},
{"role": "assistant", "content": []},
{"role": "user", "content": "Tell me"},
{"role": "user", "content": "a joke"},
]
tools = [
{
"name": "get_current_weather",
"description": "Get the current weather",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "save_conversation",
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
"input_schema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"input_schema": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"input_schema": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -17,13 +17,16 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
):
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
parser.add_argument(
"-k",
"--apikey",
@@ -39,19 +42,15 @@ async def configure_with_args(
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
aiohttp_session=aiohttp_session)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -24,7 +24,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot
Then, visit `http://localhost:7860/start` in your browser to start a chatbot
session.
## Build and test the Docker image
@@ -41,4 +41,4 @@ docker build -t moonbot -f Dockerfile.intel .
docker run --env-file .env -p 7860:7860 --device /dev/dri moonbot
```
You can try to visit `http://localhost:7860/` again.
You can try to visit `http://localhost:7860/start` again.

View File

@@ -11,24 +11,23 @@ import sys
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
ImageRawFrame,
OutputImageRawFrame,
SpriteFrame,
Frame,
LLMMessagesFrame,
TTSAudioRawFrame,
AudioRawFrame,
TTSStoppedFrame,
TextFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.aggregators.llm_response import LLMUserResponseAggregator
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -36,13 +35,13 @@ from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.moondream import MoondreamService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -60,7 +59,7 @@ for i in range(1, 26):
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
sprites.append(ImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
flipped = sprites[::-1]
sprites.extend(flipped)
@@ -83,7 +82,7 @@ class TalkingAnimation(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
if isinstance(frame, AudioRawFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
@@ -106,9 +105,7 @@ class UserImageRequester(FrameProcessor):
if self.participant_id and isinstance(frame, TextFrame):
if frame.text == user_request_answer:
await self.push_frame(
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM)
await self.push_frame(TextFrame("Describe the image in a short sentence."))
elif isinstance(frame, UserImageRawFrame):
await self.push_frame(frame)
@@ -152,8 +149,8 @@ async def main():
camera_out_height=576,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
@@ -161,7 +158,9 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
ta = TalkingAnimation()
@@ -182,21 +181,19 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
ura = LLMUserResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
ParallelPipeline([sa, ir, va, moondream], [tf, imgf]),
tts,
ta,
transport.output(),
context_aggregator.assistant(),
]
)
pipeline = Pipeline([
transport.input(),
ura,
llm,
ParallelPipeline(
[sa, ir, va, moondream],
[tf, imgf]),
tts,
ta,
transport.output()
])
task = PipelineTask(pipeline)
await task.queue_frame(quiet_frame)

View File

@@ -1,4 +1,4 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
CARTESIA_API_KEY=your_cartesia_api_key_here
ELEVENLABS_API_KEY=aeb...

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,cartesia,moondream,openai,silero]
pipecat-ai[daily,moondream,openai,silero]

View File

@@ -14,8 +14,11 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
parser.add_argument(
"-k",
"--apikey",
@@ -31,18 +34,15 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
aiohttp_session=aiohttp_session
)
# Create a meeting token for the given room with an expiration 1 hour in

View File

@@ -38,14 +38,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -57,7 +56,7 @@ app.add_middleware(
)
@app.get("/")
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
@@ -66,34 +65,37 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
[
f"python3 -m bot -u {room.url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -105,7 +107,8 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -122,10 +125,14 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
parser = argparse.ArgumentParser(
description="Daily Moondream FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
config = parser.parse_args()

View File

@@ -1,39 +1,12 @@
# Patient-intake chatbot
# Simple Chatbot
<img src="image.png" width="420px">
This project implements an AI-powered chatbot designed to streamline the medical intake process for Tri-County Health Services. The chatbot, named Jessica, interacts with patients to collect essential information before their doctor's visit, enhancing efficiency and improving the patient experience.
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
## Features
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
Identity Verification: Confirms patient identity by verifying their date of birth.
Prescription Information: Collects details about current medications and dosages.
Allergy Documentation: Records patient allergies.
Medical Conditions: Gathers information about existing medical conditions.
Reason for Visit: Asks patients about the purpose of their current doctor's visit.
## Technical Stack
Language: Python
AI Model: OpenAI's GPT-4
Text-to-Speech: Cartesia TTS Service
Audio Processing: Silero VAD (Voice Activity Detection)
Real-time Communication: Daily.co API
## Key Components
IntakeProcessor: Manages the conversation flow and information gathering process.
DailyTransport: Handles real-time audio communication.
CartesiaTTSService: Converts text responses to speech.
OpenAILLMService: Processes natural language and generates appropriate responses.
Pipeline: Orchestrates the flow of information between different components.
How It Works
The chatbot introduces itself and verifies the patient's identity.
It systematically collects information about prescriptions, allergies, medical conditions, and the reason for the visit.
The conversation is guided by a series of function calls that transition between different stages of the intake process.
All collected information is logged for later use by medical professionals.
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.
@@ -54,7 +27,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
## Build and test the Docker image

View File

@@ -10,8 +10,7 @@ import os
import sys
import wave
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import OutputAudioRawFrame
from pipecat.frames.frames import AudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -20,13 +19,13 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame, OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -50,44 +49,40 @@ for file in sound_files:
filename = os.path.splitext(os.path.basename(full_path))[0]
# Open the sound and convert it to bytes
with wave.open(full_path) as audio_file:
sounds[file] = OutputAudioRawFrame(
audio_file.readframes(-1), audio_file.getframerate(), audio_file.getnchannels()
)
sounds[file] = AudioRawFrame(audio_file.readframes(-1),
audio_file.getframerate(), audio_file.getnchannels())
class IntakeProcessor:
def __init__(self, context: OpenAILLMContext):
print(f"Initializing context from IntakeProcessor")
context.add_message(
context.add_message({"role": "system", "content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function."})
context.set_tools([
{
"role": "system",
"content": "You are Jessica, an agent for a company called Tri-County Health Services. Your job is to collect important information from the user before their doctor visit. You're talking to Chad Bailey. You should address the user by their first name and be polite and professional. You're not a medical professional, so you shouldn't provide any advice. Keep your responses short. Your job is to collect information to give to a doctor. Don't make assumptions about what values to plug into functions. Ask for clarification if a user response is ambiguous. Start by introducing yourself. Then, ask the user to confirm their identity by telling you their birthday, including the year. When they answer with their birthday, call the verify_birthday function.",
}
)
context.set_tools(
[
{
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}
},
},
"type": "function",
"function": {
"name": "verify_birthday",
"description": "Use this function to verify the user has provided their correct birthday.",
"parameters": {
"type": "object",
"properties": {
"birthday": {
"type": "string",
"description": "The user's birthdate, including the year. The user can provide it in any format, but convert it to YYYY-MM-DD format to call this function.",
}},
},
}
]
)
},
}])
async def verify_birthday(
self, function_name, tool_call_id, args, llm, context, result_callback
):
self,
function_name,
tool_call_id,
args,
llm,
context,
result_callback):
if args["birthday"] == "1983-01-01":
context.set_tools(
[
@@ -114,35 +109,18 @@ class IntakeProcessor:
},
},
},
}
},
}},
},
},
}
]
)
}])
# It's a bit weird to push this to the LLM, but it gets it into the pipeline
# await llm.push_frame(sounds["ding2.wav"], FrameDirection.DOWNSTREAM)
# We don't need the function call in the context, so just return a new
# system message and let the framework re-prompt
await result_callback(
[
{
"role": "system",
"content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages.",
}
]
)
await result_callback([{"role": "system", "content": "Next, thank the user for confirming their identity, then ask the user to list their current prescriptions. Each prescription needs to have a medication name and a dosage. Do not call the list_prescriptions function with any unknown dosages."}])
else:
# The user provided an incorrect birthday; ask them to try again
await result_callback(
[
{
"role": "system",
"content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
}
]
)
await result_callback([{"role": "system", "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function."}])
async def start_prescriptions(self, function_name, llm, context):
print(f"!!! doing start prescriptions")
@@ -165,22 +143,16 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "What the user is allergic to",
}
},
}},
},
}
},
}},
},
},
}
]
)
}])
context.add_message(
{
"role": "system",
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function.",
}
)
"content": "Next, ask the user if they have any allergies. Once they have listed their allergies or confirmed they don't have any, call the list_allergies function."})
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
@@ -206,22 +178,17 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's medical condition",
}
},
}},
},
}
},
}},
},
},
},
]
)
])
context.add_message(
{
"role": "system",
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
}
)
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function."})
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
@@ -245,31 +212,24 @@ class IntakeProcessor:
"name": {
"type": "string",
"description": "The user's reason for visiting the doctor",
}
},
}},
},
}
},
}},
},
},
}
]
)
}])
context.add_message(
{
"role": "system",
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function."})
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
# move to finish call
context.set_tools([])
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
context.add_message({"role": "system",
"content": "Now, thank the user and end the conversation."})
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
@@ -300,7 +260,7 @@ async def main():
# tier="nova",
# model="2-general"
# )
),
)
)
tts = CartesiaTTSService(
@@ -313,7 +273,9 @@ async def main():
# voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
# )
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = []
context = OpenAILLMContext(messages=messages)
@@ -322,31 +284,33 @@ async def main():
intake = IntakeProcessor(context)
llm.register_function("verify_birthday", intake.verify_birthday)
llm.register_function(
"list_prescriptions", intake.save_data, start_callback=intake.start_prescriptions
)
"list_prescriptions",
intake.save_data,
start_callback=intake.start_prescriptions)
llm.register_function(
"list_allergies", intake.save_data, start_callback=intake.start_allergies
)
"list_allergies",
intake.save_data,
start_callback=intake.start_allergies)
llm.register_function(
"list_conditions", intake.save_data, start_callback=intake.start_conditions
)
"list_conditions",
intake.save_data,
start_callback=intake.start_conditions)
llm.register_function(
"list_visit_reasons", intake.save_data, start_callback=intake.start_visit_reasons
)
"list_visit_reasons",
intake.save_data,
start_callback=intake.start_visit_reasons)
fl = FrameLogger("LLM Output")
pipeline = Pipeline(
[
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
]
)
pipeline = Pipeline([
transport.input(), # Transport input
context_aggregator.user(), # User responses
llm, # LLM
fl, # Frame logger
tts, # TTS
transport.output(), # Transport output
context_aggregator.assistant(), # Assistant responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=False))

View File

@@ -1,4 +1,4 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
CARTESIA_API_KEY=your_cartesia_api_key_here
ELEVENLABS_API_KEY=aeb...

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,cartesia,openai,silero]
pipecat-ai[daily,openai,silero]

View File

@@ -14,8 +14,11 @@ from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
"-u",
"--url",
type=str,
required=False,
help="URL of the Daily room to join")
parser.add_argument(
"-k",
"--apikey",
@@ -31,19 +34,15 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL.")
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
aiohttp_session=aiohttp_session)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.

View File

@@ -38,14 +38,13 @@ async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
@@ -57,7 +56,7 @@ app.add_middleware(
)
@app.get("/")
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
@@ -66,34 +65,37 @@ async def start_agent(request: Request):
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
[
f"python3 -m bot -u {room.url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@@ -105,7 +107,8 @@ def get_status(pid: int):
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
@@ -122,13 +125,17 @@ if __name__ == "__main__":
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily patient-intake FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
config = parser.parse_args()
print(f"to join a test room, visit http://localhost:{config.port}/")
print(f"to join a test room, visit http://localhost:{config.port}/start")
uvicorn.run(
"server:app",
host=config.host,

Some files were not shown because too many files have changed in this diff Show More