Compare commits

..

3 Commits

Author SHA1 Message Date
James Hush
b29ac3c7a8 Remove logs 2025-01-23 16:31:41 +08:00
James Hush
5222488fb5 Have a default transfer 2025-01-23 16:24:40 +08:00
James Hush
c2fef9584b Add call transfer to bot_daily 2025-01-23 15:54:53 +08:00
299 changed files with 4201 additions and 21688 deletions

View File

@@ -1,54 +0,0 @@
name: coverage
on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- "**"
paths-ignore:
- "docs/**"
jobs:
coverage:
name: "Coverage"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python
id: setup_python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Cache virtual environment
uses: actions/cache@v3
with:
# We are hashing dev-requirements.txt and test-requirements.txt which
# contain all dependencies needed to run the tests.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
path: .venv
- name: Install system packages
id: install_system_packages
run: |
sudo apt-get install -y portaudio19-dev
- name: Setup virtual environment
run: |
python -m venv .venv
- name: Install basic Python dependencies
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt -r test-requirements.txt
- name: Run tests with coverage
run: |
source .venv/bin/activate
coverage run
coverage xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: pipecat-ai/pipecat

View File

@@ -5,344 +5,15 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.57] - 2025-02-14
## [Unreleased]
### Added
- Added new `AudioContextWordTTSService`. This is a TTS base class for TTS
services that handling multiple separate audio requests.
- Added new frames `EmulateUserStartedSpeakingFrame` and
`EmulateUserStoppedSpeakingFrame` which can be used to emulated VAD behavior
without VAD being present or not being triggered.
- Added a new `audio_in_stream_on_start` field to `TransportParams`.
- Added a new method `start_audio_in_streaming` in the `BaseInputTransport`.
- This method should be used to start receiving the input audio in case the
field `audio_in_stream_on_start` is set to `false`.
- Added support for the `RTVIProcessor` to handle buffered audio in `base64`
format, converting it into InputAudioRawFrame for transport.
- Added support for the `RTVIProcessor` to trigger `start_audio_in_streaming`
only after the `client-ready` message.
- Added new `MUTE_UNTIL_FIRST_BOT_COMPLETE` strategy to `STTMuteStrategy`. This
strategy starts muted and remains muted until the first bot speech completes,
ensuring the bot's first response cannot be interrupted. This complements the
existing `FIRST_SPEECH` strategy which only mutes during the first detected
bot speech.
- Added support for Google Cloud Speech-to-Text V2 through `GoogleSTTService`.
- Added `RimeTTSService`, a new `WordTTSService`. Updated the foundational
example `07q-interruptible-rime.py` to use `RimeTTSService`.
- Added support for Groq's Whisper API through the new `GroqSTTService` and
OpenAI's Whisper API through the new `OpenAISTTService`. Introduced a new
base class `BaseWhisperSTTService` to handle common Whisper API
functionality.
- Added `PerplexityLLMService` for Perplexity NIM API integration, with an
OpenAI-compatible interface. Also, added foundational example
`14n-function-calling-perplexity.py`.
- Added `DailyTransport.update_remote_participants()`. This allows you to update
remote participant's settings, like their permissions or which of their
devices are enabled. Requires that the local participant have participant
admin permission.
### Changed
- We don't consider a colon `:` and end of sentence any more.
- Updated `DailyTransport` to respect the `audio_in_stream_on_start` field,
ensuring it only starts receiving the audio input if it is enabled.
- Updated `FastAPIWebsocketOutputTransport` to send `TransportMessageFrame` and
`TransportMessageUrgentFrame` to the serializer.
- Updated `WebsocketServerOutputTransport` to send `TransportMessageFrame` and
`TransportMessageUrgentFrame` to the serializer.
- Enhanced `STTMuteConfig` to validate strategy combinations, preventing
`MUTE_UNTIL_FIRST_BOT_COMPLETE` and `FIRST_SPEECH` from being used together
as they handle first bot speech differently.
- Updated foundational example `07n-interruptible-google.py` to use all Google
services.
- `RimeHttpTTSService` now uses the `mistv2` model by default.
- Improved error handling in `AzureTTSService` to properly detect and log
synthesis cancellation errors.
- Enhanced `WhisperSTTService` with full language support and improved model
documentation.
- Updated foundation example `14f-function-calling-groq.py` to use
`GroqSTTService` for transcription.
- Updated `GroqLLMService` to use `llama-3.3-70b-versatile` as the default
model.
- `RTVIObserver` doesn't handle `LLMSearchResponseFrame` frames anymore. For
now, to handle those frames you need to create a `GoogleRTVIObserver`
instead.
### Deprecated
- `STTMuteFilter` constructor's `stt_service` parameter is now deprecated and
will be removed in a future version. The filter now manages mute state
internally instead of querying the STT service.
- `RTVI.observer()` is now deprecated, instantiate an `RTVIObserver` directly
instead.
- All RTVI frame processors (e.g. `RTVISpeakingProcessor`,
`RTVIBotLLMProcessor`) are now deprecated, instantiate an `RTVIObserver`
instead.
### Fixed
- Fixed a `FalImageGenService` issue that was causing the event loop to be
blocked while loading the downloadded image.
- Fixed a `CartesiaTTSService` service issue that would cause audio overlapping
in some cases.
- Fixed a websocket-based service issue (e.g. `CartesiaTTSService`) that was
preventing a reconnection after the server disconnected cleanly, which was
causing an inifite loop instead.
- Fixed a `BaseOutputTransport` issue that was causing upstream frames to no be
pushed upstream.
- Fixed multiple issue where user transcriptions where not being handled
properly. It was possible for short utterances to not trigger VAD which would
cause user transcriptions to be ignored. It was also possible for one or more
transcriptions to be generated after VAD in which case they would also be
ignored.
- Fixed an issue that was causing `BotStoppedSpeakingFrame` to be generated too
late. This could then cause issues unblocking `STTMuteFilter` later than
desired.
- Fixed an issue that was causing `AudioBufferProcessor` to not record
synchronized audio.
- Fixed an `RTVI` issue that was causing `bot-tts-text` messages to be sent
before being processed by the output transport.
- Fixed an issue[#1192] in 11labs where we are trying to reconnect/disconnect
the websocket connection even when the connection is already closed.
- Fixed an issue where `has_regular_messages` condition was always true in
`GoogleLLMContext` due to `Part` having `function_call` & `function_response`
with `None` values.
### Other
- Added new `instant-voice` example. This example showcases how to enable
instant voice communication as soon as a user connects.
- Added new `local-input-select-stt` example. This examples allows you to play
with local audio inputs by slecting them through a nice text interface.
## [0.0.56] - 2025-02-06
### Changed
- Use `gemini-2.0-flash-001` as the default model for `GoogleLLMSerivce`.
- Improved foundational examples 22b, 22c, and 22d to support function calling.
With these base examples, `FunctionCallInProgressFrame` and
`FunctionCallResultFrame` will no longer be blocked by the gates.
### Fixed
- Fixed a `TkLocalTransport` and `LocalAudioTransport` issues that was causing
errors on cleanup.
- Fixed an issue that was causing `tests.utils` import to fail because of
logging setup.
- Fixed a `SentryMetrics` issue that was preventing any metrics to be sent to
Sentry and also was preventing from metrics frames to be pushed to the
pipeline.
- Fixed an issue in `BaseOutputTransport` where incoming audio would not be
resampled to the desired output sample rate.
- Fixed an issue with the `TwilioFrameSerializer` and `TelnyxFrameSerializer`
where `twilio_sample_rate` and `telnyx_sample_rate` were incorrectly
initialized to `audio_in_sample_rate`. Those values currently default to 8000
and should be set manually from the serializer constructor if a different
value is needed.
### Other
- Added a new `sentry-metrics` example.
## [0.0.55] - 2025-02-05
### Added
- Added a new `start_metadata` field to `PipelineParams`. The provided metadata
will be set to the initial `StartFrame` being pushed from the `PipelineTask`.
- Added new fields to `PipelineParams` to control audio input and output sample
rates for the whole pipeline. This allows controlling sample rates from a
single place instead of having to specify sample rates in each
service. Setting a sample rate to a service is still possible and will
override the value from `PipelineParams`.
- Introduce audio resamplers (`BaseAudioResampler`). This is just a base class
to implement audio resamplers. Currently, two implementations are provided
`SOXRAudioResampler` and `ResampyResampler`. A new
`create_default_resampler()` has been added (replacing the now deprecated
`resample_audio()`).
- It is now possible to specify the asyncio event loop that a `PipelineTask` and
all the processors should run on by passing it as a new argument to the
`PipelineRunner`. This could allow running pipelines in multiple threads each
one with its own event loop.
- Added a new `utils.TaskManager`. Instead of a global task manager we now have
a task manager per `PipelineTask`. In the previous version the task manager
was global, so running multiple simultaneous `PipelineTask`s could result in
dangling task warnings which were not actually true. In order, for all the
processors to know about the task manager, we pass it through the
`StartFrame`. This means that processors should create tasks when they receive
a `StartFrame` but not before (because they don't have a task manager yet).
- Added `TelnyxFrameSerializer` to support Telnyx calls. A full running example
has also been added to `examples/telnyx-chatbot`.
- Allow pushing silence audio frames before `TTSStoppedFrame`. This might be
useful for testing purposes, for example, passing bot audio to an STT service
which usually needs additional audio data to detect the utterance stopped.
- `TwilioSerializer` now supports transport message frames. With this we can
create Twilio emulators.
- Added a new transport: `WebsocketClientTransport`.
- Added a `metadata` field to `Frame` which makes it possible to pass custom
data to all frames.
- Added `test/utils.py` inside of pipecat package.
### Changed
- `GatedOpenAILLMContextAggregator` now require keyword arguments. Also, a new
`start_open` argument has been added to set the initial state of the gate.
- Added `organization` and `project` level authentication to
`OpenAILLMService`.
- Improved the language checking logic in `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService` to properly handle language codes based on model
compatibility, with appropriate warnings when language codes cannot be
applied.
- Updated `GoogleLLMContext` to support pushing `LLMMessagesUpdateFrame`s that
contain a combination of function calls, function call responses, system
messages, or just messages.
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
`OutputDTMFFrame` frame.
### Deprecated
- `resample_audio()` is now deprecated, use `create_default_resampler()`
instead.
### Removed
- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
`AudioBufferProcessor.start_recording()` and
`AudioBufferProcessor.stop_recording()` instead.
### Fixed
- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
- Fixed an issue in `AudioBufferProcessor` where user callback would not be
called on task cancellation.
- Fixed an issue in `AudioBufferProcessor` that would cause wrong silence
padding in some cases.
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
websocket error by increasing the max message size limit to 16MB.
- Fixed a `DailyTransport` issue that would cause events to be triggered before
join finished.
- Fixed a `PipelineTask` issue that was preventing processors to be cleaned up
after cancelling the task.
- Fixed an issue where queuing a `CancelFrame` to a pipeline task would not
cause the task to finish. However, using `PipelineTask.cancel()` is still the
recommended way to cancel a task.
### Other
- Improved Unit Test `run_test()` to use `PipelineTask` and
`PipelineRunner`. There's now also some control around `StartFrame` and
`EndFrame`. The `EndTaskFrame` has been removed since it doesn't seem
necessary with this new approach.
- Updated `twilio-chatbot` with a few new features: use 8000 sample rate and
avoid resampling, a new client useful for stress testing and testing locally
without the need to make phone calls. Also, added audio recording on both the
client and the server to make sure the audio sounds good.
- Updated examples to use `task.cancel()` to immediately exit the example when a
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
an `EndFrame` causes the bot to run through everything that is internally
queued (which could take some seconds). Note that using `task.cancel()` might
not always be the best option and pushing an `EndFrame` could still be
desirable to make sure all the pipeline is flushed.
## [0.0.54] - 2025-01-27
### Added
- In order to create tasks in Pipecat frame processors it is now recommended to
use `FrameProcessor.create_task()` (which uses the new
`utils.asyncio.create_task()`). It takes care of uncaught exceptions, task
cancellation handling and task management. To cancel or wait for a task there
is `FrameProcessor.cancel_task()` and `FrameProcessor.wait_for_task()`. All of
Pipecat processors have been updated accordingly. Also, when a pipeline runner
finishes, a warning about dangling tasks might appear, which indicates if any
of the created tasks was never cancelled or awaited for (using these new
functions).
- It is now possible to specify the period of the `PipelineTask` heartbeat
frames with `heartbeats_period_secs`.
- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models
for meeting token creation in `get_token` method of `DailyRESTHelper`.
- Added `enable_recording` and `geo` parameters to `DailyRoomProperties`.
- Added `RecordingsBucketConfig` to `DailyRoomProperties` to upload recordings
to a custom AWS bucket.
### Changed
- Enhanced `UserIdleProcessor` with retry functionality and control over idle
monitoring via new callback signature `(processor, retry_count) -> bool`.
Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.
- Add defensive error handling for `OpenAIRealtimeBetaLLMService`'s audio
truncation. Audio truncation errors during interruptions now log a warning
and allow the session to continue instead of throwing an exception.
- Modified `TranscriptProcessor` to use TTS text frames for more accurate assistant
transcripts. Assistant messages are now aggregated based on bot speaking boundaries
rather than LLM context, providing better handling of interruptions and partial
@@ -355,21 +26,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an `GeminiMultimodalLiveLLMService` issue that was preventing the user
to push initial LLM assistant messages (using `LLMMessagesAppendFrame`).
- Added missing `FrameProcessor.cleanup()` calls to `Pipeline`,
`ParallelPipeline` and `UserIdleProcessor`.
- Fixed a type error when using `voice_settings` in `ElevenLabsHttpTTSService`.
- Fixed an issue where `OpenAIRealtimeBetaLLMService` function calling resulted
in an error.
- Fixed an issue in `AudioBufferProcessor` where the last audio buffer was not
being processed, in cases where the `_user_audio_buffer` was smaller than the
buffer size.
### Performance
- Replaced audio resampling library `resampy` with `soxr`. Resampling a 2:21s
@@ -1756,9 +1417,6 @@ async def on_connected(processor):
### Changed
- `FrameSerializer.serialize()` and `FrameSerializer.deserialize()` are now
`async`.
- `Filter` has been renamed to `FrameFilter` and it's now under
`processors/filters`.

View File

@@ -2,7 +2,7 @@
 <img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
</div></h1>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![codecov](https://codecov.io/gh/pipecat-ai/pipecat/graph/badge.svg?token=LNVUIVO4Y9)](https://codecov.io/gh/pipecat-ai/pipecat) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat)
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
Pipecat is an open source Python framework for building voice and multimodal conversational agents. It handles the complex orchestration of AI services, network transport, audio processing, and multimodal interactions, letting you focus on creating engaging experiences.
@@ -53,19 +53,19 @@ To keep things lightweight, only the core framework is included by default. If y
pip install "pipecat-ai[option,...]"
```
### Available services
Available options include:
| Category | Services | Install Command Example |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` |
| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[google]"` |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | `pip install "pipecat-ai[daily]"` |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` |
| Vision & Image | [Moondream](https://docs.pipecat.ai/server/services/vision/moondream), [fal](https://docs.pipecat.ai/server/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` |
| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` |
| Category | Services | Install Command Example |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) | `pip install "pipecat-ai[deepgram]"` |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Together AI](https://docs.pipecat.ai/server/services/llm/together) | `pip install "pipecat-ai[openai]"` |
| Text-to-Speech | [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) | `pip install "pipecat-ai[cartesia]"` |
| Speech-to-Speech | [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) | `pip install "pipecat-ai[openai]"` |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local | `pip install "pipecat-ai[daily]"` |
| Video | [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) | `pip install "pipecat-ai[tavus,simli]"` |
| Vision & Image | [Moondream](https://docs.pipecat.ai/server/services/vision/moondream), [fal](https://docs.pipecat.ai/server/services/image-generation/fal) | `pip install "pipecat-ai[moondream]"` |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [Noisereduce](https://docs.pipecat.ai/server/utilities/audio/noisereduce-filter) | `pip install "pipecat-ai[silero]"` |
| Analytics & Metrics | [Canonical AI](https://docs.pipecat.ai/server/services/analytics/canonical), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) | `pip install "pipecat-ai[canonical]"` |
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
@@ -81,7 +81,7 @@ Here is a very basic Pipecat bot that greets a user when they join a real-time s
```python
import asyncio
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
@@ -122,7 +122,7 @@ async def main():
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
# Run the pipeline task
await runner.run(task)
@@ -149,40 +149,36 @@ Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://
## Hacking on the framework itself
_Note: You may need to set up a virtual environment before following these instructions. From the root of the repo:_
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
```shell
python3 -m venv venv
source venv/bin/activate
```
Install the development dependencies:
From the root of this repo, run the following:
```shell
pip install -r dev-requirements.txt
```
Install the git pre-commit hooks (these help ensure your code follows project rules):
This will install the necessary development dependencies. Also, make sure you install the git pre-commit hooks:
```shell
pre-commit install
```
Install the `pipecat-ai` package locally in editable mode:
The hooks will just save you time when you submit a PR by making sure your code follows the project rules.
To use the package locally (e.g. to run sample files), run:
```shell
pip install -e .
pip install --editable ".[option,...]"
```
The `-e` or `--editable` option allows you to modify the code without reinstalling.
The `--editable` option makes sure you don't have to run `pip install` again and you can just edit the project files locally.
To include optional dependencies, add them to the install command. For example:
```shell
pip install -e ".[daily,deepgram,cartesia,openai,silero]" # Updated for the services you're using
```
If you want to use this package from another directory:
If you want to use this package from another directory, you can run:
```shell
pip install "path_to_this_repo[option,...]"

View File

@@ -1,11 +0,0 @@
coverage:
range: 50..90 # coverage lower than 50 is red, higher than 90 green, between color code
status:
project:
default:
target: auto # auto % coverage target
threshold: 5% # allow for 5% reduction of coverage without failing
# do not run coverage on patch nor changes
patch: false

View File

@@ -1,12 +1,11 @@
build~=1.2.2
coverage~=7.6.12
grpcio-tools~=1.67.1
grpcio-tools~=1.69.0
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.393
pyright~=1.1.392
pytest~=8.3.4
pytest-asyncio~=0.25.2
ruff~=0.9.5
setuptools~=70.0.0
ruff~=0.9.1
setuptools~=75.8.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -39,7 +39,7 @@ Next, follow the steps in the README for each demo.
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, ElevenLabs, OpenAI, Moondream, Daily, Daily Prebuilt UI |
| [Patient intake](patient-intake) | A chatbot that can call functions in response to user input. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
| [Phone Chatbot](phone-chatbot) | A chatbot that connects to PSTN/SIP phone calls, powered by Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [studypal](studypal) | A chatbot to have a conversation about any article on the web | |
| [WebSocket Chatbot Server](websocket-server) | A real-time websocket server that handles audio streaming and bot interactions with speech-to-text and text-to-speech capabilities. | Cartesia, Deepgram, OpenAI, Websockets |

View File

@@ -1,45 +0,0 @@
# Bot ready signaling
A simple Pipecat example demonstrating how to handle signaling between the client and the bot,
ensuring that the bot starts sending audio only when the client is available,
thereby avoiding the risk of cutting off the beginning of the audio.
## Quick Start
### First, start the bot server:
1. Navigate to the server directory:
```bash
cd server
```
2. Create and activate a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install requirements:
```bash
pip install -r requirements.txt
```
4. Copy env.example to .env and configure:
- Add your API keys
5. Start the server:
```bash
python server.py
```
### Next, connect using the client app:
For client-side setup, refer to the [JavaScript Guide](client/javascript/README.md).
## Important Note
Ensure the bot server is running before using any client implementations.
## Requirements
- Python 3.10+
- Node.js 16+ (for JavaScript)
- Daily API key
- Cartesia API key
- Modern web browser with WebRTC support

View File

@@ -1,27 +0,0 @@
# JavaScript Implementation
Basic implementation using the [Pipecat JavaScript SDK](https://docs.pipecat.ai/client/js/introduction).
## Setup
1. Run the bot server. See the [server README](../../README).
2. Navigate to the `client/javascript` directory:
```bash
cd client/javascript
```
3. Install dependencies:
```bash
npm install
```
4. Run the client app:
```
npm run dev
```
5. Visit http://localhost:5173 in your browser.

View File

@@ -1,34 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Chatbot</title>
</head>
<body>
<div class="container">
<div class="status-bar">
<div class="status">
Status: <span id="connection-status">Disconnected</span>
</div>
<div class="controls">
<button id="connect-btn">Connect</button>
<button id="disconnect-btn" disabled>Disconnect</button>
</div>
</div>
<audio id="bot-audio" autoplay></audio>
<div class="debug-panel">
<h3>Debug Info</h3>
<div id="debug-log"></div>
</div>
</div>
<script type="module" src="/src/app.js"></script>
<link rel="stylesheet" href="/src/style.css">
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +0,0 @@
{
"name": "client",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"devDependencies": {
"vite": "^6.0.9"
},
"dependencies": {
"@daily-co/daily-js": "0.74.0"
}
}

View File

@@ -1,216 +0,0 @@
/**
* Copyright (c) 20242025, Daily
*
* SPDX-License-Identifier: BSD 2-Clause License
*/
import Daily from "@daily-co/daily-js";
/**
* ChatbotClient handles the connection and media management for a real-time
* voice interaction with an AI bot.
*/
class ChatbotClient {
constructor() {
// Initialize client state
this.dailyCallObject = null;
this.setupDOMElements();
this.setupEventListeners();
}
/**
* Set up references to DOM elements and create necessary media elements
*/
setupDOMElements() {
// Get references to UI control elements
this.connectBtn = document.getElementById('connect-btn');
this.disconnectBtn = document.getElementById('disconnect-btn');
this.statusSpan = document.getElementById('connection-status');
this.debugLog = document.getElementById('debug-log');
// Create an audio element for bot's voice output
this.botAudio = document.createElement('audio');
this.botAudio.autoplay = true;
this.botAudio.playsInline = true;
document.body.appendChild(this.botAudio);
}
/**
* Set up event listeners for connect/disconnect buttons
*/
setupEventListeners() {
this.connectBtn.addEventListener('click', () => this.connect());
this.disconnectBtn.addEventListener('click', () => this.disconnect());
}
/**
* Add a timestamped message to the debug log
*/
log(message) {
const entry = document.createElement('div');
entry.textContent = `${new Date().toISOString()} - ${message}`;
// Add styling based on message type
if (message.startsWith('User: ')) {
entry.style.color = '#2196F3'; // blue for user
} else if (message.startsWith('Bot: ')) {
entry.style.color = '#4CAF50'; // green for bot
}
this.debugLog.appendChild(entry);
this.debugLog.scrollTop = this.debugLog.scrollHeight;
console.log(message);
}
/**
* Update the connection status display
*/
updateStatus(status) {
this.statusSpan.textContent = status;
this.log(`Status: ${status}`);
}
handleEventToConsole (evt) {
this.log(`Received event: ${evt.action}`);
};
/**
* Set up listeners for track events (start/stop)
* This handles new tracks being added during the session
*/
setupTrackListeners() {
if (!this.dailyCallObject) return;
this.dailyCallObject.on("joined-meeting", () => {
this.updateStatus('Connected');
this.connectBtn.disabled = true;
this.disconnectBtn.disabled = false;
this.log('Client connected');
});
this.dailyCallObject.on("track-started", (evt) => {
if (evt.track.kind === "audio" && evt.participant.local === false) {
this.log("Audio track started.")
this.setupAudioTrack(evt.track);
}
});
this.dailyCallObject.on("track-stopped", this.handleEventToConsole.bind(this));
this.dailyCallObject.on("participant-joined", this.handleEventToConsole.bind(this));
this.dailyCallObject.on("participant-updated", this.handleEventToConsole.bind(this));
this.dailyCallObject.on("participant-left", () => {
// When the bot leaves, we are also disconnecting from the call
this.disconnect()
});
this.dailyCallObject.on("left-meeting", () => {
this.updateStatus('Disconnected');
this.connectBtn.disabled = false;
this.disconnectBtn.disabled = true;
this.log('Client disconnected');
});
this.dailyCallObject.on("error", this.handleEventToConsole.bind(this));
}
/**
* Set up an audio track for playback
* Handles both initial setup and track updates
*/
setupAudioTrack(track) {
this.log(`Setting up audio track, track state: ${track.readyState}, muted: ${track.muted}`);
// Check if we're already playing this track
if (this.botAudio.srcObject) {
const oldTrack = this.botAudio.srcObject.getAudioTracks()[0];
if (oldTrack?.id === track.id) return;
}
// Create a new MediaStream with the track and set it as the audio source
this.botAudio.srcObject = new MediaStream([track]);
this.botAudio.onplaying = async (event) => {
this.log("onplaying")
this.log("Will send the audio message to play the audio at the next tick")
this.dailyCallObject.sendAppMessage("playable")
}
}
async fetchRoomInfo() {
let connectUrl = '/connect'
let res = await fetch(connectUrl, {
method: "POST",
mode: "cors",
headers: new Headers({
"Content-Type": "application/json"
}),
})
if (res.ok) {
return res.json();
}
}
/**
* Initialize and connect to the bot
* This sets up the RTVI client, initializes devices, and establishes the connection
*/
async connect() {
try {
// Initialize the client
this.dailyCallObject = Daily.createCallObject({
subscribeToTracksAutomatically: true,
});
// Set up listeners for media track events
this.setupTrackListeners();
this.log('Creating the bot...');
let roomInfo = await this.fetchRoomInfo()
// Connect to the bot
this.log('Connecting to bot...');
// Only for making debugger easier
window.callObject = this.dailyCallObject;
await this.dailyCallObject.join({
url: roomInfo.room_url,
});
this.log('Connection complete');
} catch (error) {
// Handle any errors during connection
this.log(`Error connecting: ${error.message}`);
this.log(`Error stack: ${error.stack}`);
this.updateStatus('Error');
// Clean up if there's an error
if (this.dailyCallObject) {
try {
await this.dailyCallObject.leave();
} catch (disconnectError) {
this.log(`Error during disconnect: ${disconnectError.message}`);
}
}
}
}
/**
* Disconnect from the bot and clean up media resources
*/
async disconnect() {
if (this.dailyCallObject) {
try {
// Disconnect the RTVI client
await this.dailyCallObject.leave();
await this.dailyCallObject.destroy();
this.dailyCallObject = null;
// Clean up audio
if (this.botAudio.srcObject) {
this.botAudio.srcObject.getTracks().forEach((track) => track.stop());
this.botAudio.srcObject = null;
}
} catch (error) {
this.log(`Error disconnecting: ${error.message}`);
}
}
}
}
// Initialize the client when the page loads
window.addEventListener('DOMContentLoaded', () => {
new ChatbotClient();
});

View File

@@ -1,98 +0,0 @@
body {
margin: 0;
padding: 20px;
font-family: Arial, sans-serif;
background-color: #f0f0f0;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
.status-bar {
display: flex;
justify-content: space-between;
align-items: center;
padding: 10px;
background-color: #fff;
border-radius: 8px;
margin-bottom: 20px;
}
.controls button {
padding: 8px 16px;
margin-left: 10px;
border: none;
border-radius: 4px;
cursor: pointer;
}
#connect-btn {
background-color: #4caf50;
color: white;
}
#disconnect-btn {
background-color: #f44336;
color: white;
}
button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.main-content {
background-color: #fff;
border-radius: 8px;
padding: 20px;
margin-bottom: 20px;
}
.bot-container {
display: flex;
flex-direction: column;
align-items: center;
}
#bot-video-container {
width: 640px;
height: 360px;
background-color: #e0e0e0;
border-radius: 8px;
margin: 20px auto;
overflow: hidden;
display: flex;
align-items: center;
justify-content: center;
}
#bot-video-container video {
width: 100%;
height: 100%;
object-fit: cover;
}
.debug-panel {
background-color: #fff;
border-radius: 8px;
padding: 20px;
}
.debug-panel h3 {
margin: 0 0 10px 0;
font-size: 16px;
font-weight: bold;
}
#debug-log {
height: 200px;
overflow-y: auto;
background-color: #f8f8f8;
padding: 10px;
border-radius: 4px;
font-family: monospace;
font-size: 12px;
line-height: 1.4;
}

View File

@@ -1,13 +0,0 @@
import { defineConfig } from 'vite';
export default defineConfig({
server: {
proxy: {
// Proxy /api requests to the backend server
'/connect': {
target: 'http://0.0.0.0:7860', // Replace with your backend URL
changeOrigin: true,
},
},
},
});

View File

@@ -1,50 +0,0 @@
# Bot ready signaling Server
A FastAPI server that manages bot instances and provide endpoint for Pipecat client connections.
## Endpoints
- `POST /connect` - Pipecat client connection endpoint
## Environment Variables
Copy `env.example` to `.env` and configure:
```ini
# Required API Keys
DAILY_API_KEY= # Your Daily API key
CARTESIA_API_KEY= # Your Cartesia API key
# Optional Configuration
DAILY_API_URL= # Optional: Daily API URL (defaults to https://api.daily.co/v1)
DAILY_SAMPLE_ROOM_URL= # Optional: Fixed room URL for development
HOST= # Optional: Host address (defaults to 0.0.0.0)
FAST_API_PORT= # Optional: Port number (defaults to 7860)
```
## Running the Server
Set up and activate your virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
Install dependencies:
```bash
pip install -r requirements.txt
```
If you want to use the local version of `pipecat` in this repo rather than the last published version, also run:
```bash
pip install --editable "../../../[daily,cartesia,openai]"
```
Run the server:
```bash
python server.py
```

View File

@@ -1,3 +0,0 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=
CARTESIA_API_KEY=

View File

@@ -1,4 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,cartesia,openai]

View File

@@ -1,64 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from typing import Optional
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token, args)

View File

@@ -1,147 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
from typing import Any, Dict
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
# Load environment variables from .env file
load_dotenv(override=True)
# Dictionary to track bot processes: {pid: (process, room_url)}
bot_procs = {}
# Store Daily API helpers
daily_helpers = {}
def cleanup():
"""Cleanup function to terminate all bot processes.
Called during server shutdown.
"""
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan manager that handles startup and shutdown tasks.
- Creates aiohttp session
- Initializes Daily API helper
- Cleans up resources on shutdown
"""
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
# Initialize FastAPI app with lifespan manager
app = FastAPI(lifespan=lifespan)
# Configure CORS to allow requests from any origin
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def create_room_and_token() -> tuple[str, str]:
"""Helper function to create a Daily room and generate an access token.
Returns:
tuple[str, str]: A tuple containing (room_url, token)
Raises:
HTTPException: If room creation or token generation fails
"""
room = await daily_helpers["rest"].create_room(DailyRoomParams())
if not room.url:
raise HTTPException(status_code=500, detail="Failed to create room")
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
return room.url, token
@app.post("/connect")
async def bot_connect(request: Request) -> Dict[Any, Any]:
"""Connect endpoint that creates a room and returns connection credentials.
This endpoint is called by client to establish a connection.
Returns:
Dict[Any, Any]: Authentication bundle containing room_url and token
Raises:
HTTPException: If room creation, token generation, or bot startup fails
"""
print("Creating room for RTVI connection")
room_url, token = await create_room_and_token()
print(f"Room URL: {room_url}")
# Start the bot process
try:
bot_file = "signalling_bot"
proc = subprocess.Popen(
[f"python3 -m {bot_file} -u {room_url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
# Return the authentication bundle in format expected by DailyTransport
return {"room_url": room_url, "token": token}
if __name__ == "__main__":
import uvicorn
# Parse command line arguments for server configuration
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Travel Companion FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
# Start the FastAPI server
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -1,95 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from dataclasses import dataclass
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
@dataclass
class SilenceFrame(OutputAudioRawFrame):
def __init__(
self,
*,
sample_rate: int,
duration: float,
):
# Initialize the parent class with the silent frame's data
super().__init__(
audio=self.create_silent_audio_frame(sample_rate, 1, duration).audio,
sample_rate=sample_rate,
num_channels=1,
)
@staticmethod
def create_silent_audio_frame(
sample_rate: int, num_channels: int, duration: float
) -> AudioRawFrame:
"""Create an AudioRawFrame containing silence."""
frame_size = num_channels * 2 # 2 bytes per sample for 16-bit audio
total_frames = int(sample_rate * duration)
total_bytes = total_frames * frame_size
silent_audio = bytes(total_bytes) # Create a byte array filled with zeros
return AudioRawFrame(audio=silent_audio, sample_rate=sample_rate, num_channels=num_channels)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when we receive a specific message
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
logger.debug(f"Received app message: {message} - {sender}")
if "playable" not in message:
return
await task.queue_frames(
[
SilenceFrame(
sample_rate=task.params.audio_out_sample_rate,
duration=0.5,
),
TTSSpeakFrame(f"Hello there, how are you doing today ?"),
EndFrame(),
]
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -65,6 +65,7 @@ async def main():
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
@@ -123,20 +124,17 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audio_buffer_processor.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.cancel()
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
# Here we don't want to cancel, we just want to finish sending
# whatever is queued, so we use an EndFrame().
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -53,3 +53,4 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -18,6 +18,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -82,6 +83,7 @@ async def main():
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
@@ -108,9 +110,8 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: Watch out! This will save all the conversation in memory. You
# can pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
# Save audio every 10 seconds.
audiobuffer = AudioBufferProcessor(buffer_size=480000)
pipeline = Pipeline(
[
@@ -132,14 +133,13 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audiobuffer.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -79,13 +79,11 @@ async def main(room_url: str, token: str):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
# Here we don't want to cancel, we just want to finish sending
# whatever is queued, so we use an EndFrame().
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -5,15 +5,6 @@ import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
@@ -21,6 +12,16 @@ logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token: str):
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
transport = DailyTransport(
room_url,
token,
@@ -78,7 +79,7 @@ async def main(room_url: str, token: str):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -0,0 +1,94 @@
<div align="center">
 <img alt="pipecat" width="300px" height="auto" src="image.png">
</div>
# Dialin example
Example project that demonstrates how to add phone number dialin to your Pipecat bots. We include examples for both Daily (`bot_daily.py`) and Twilio (`bot_twilio.py`), depending on who you want to use as a phone vendor.
- 🔁 Transport: Daily WebRTC
- 💬 Speech-to-Text: Deepgram via Daily transport
- 🤖 LLM: GPT4-o / OpenAI
- 🔉 Text-to-Speech: ElevenLabs
#### Should I use Daily or Twilio as a vendor?
If you're starting from scratch, using Daily to provision phone numbers alongside Daily as a transport offers some convenience (such as automatic call forwarding.)
If you already have Twilio numbers and workflows that you want to connect to your Pipecat bots, there is some additional configuration required (you'll need to create a `on_dialin_ready` and use the Twilio client to trigger the forward.)
You can read more about this, as well as see respective walkthroughs in our docs.
## Setup
```shell
# Install the requirements
pip install -r requirements.txt
# Setup your env
mv env.example .env
```
## Using Daily numbers
Run `bot_runner.py` to handle incoming HTTP requests:
`python bot_runner.py --host localhost`
Then target the following URL:
```bash
curl -X POST 'http://localhost:7860/daily_start_bot' \
-H 'Content-Type: application/json' \
-d '{
"callId": "callId-from-call",
"callDomain": "callDomain-from-call"
}'
```
Use [this guide](https://docs.pipecat.ai/guides/telephony/daily-webrtc) to connect a phone number purchased from Daily to the bot.
For more configuration options, please consult Daily's API documentation.
## Using Twilio numbers
As above, but target the following URL:
`POST /twilio_start_bot`
For more configuration options, please consult Twilio's API documentation.
## Deployment example
A Dockerfile is included in this demo for convenience. Here is an example of how to build and deploy your bot to [fly.io](https://fly.io).
*Please note: This demo spawns agents as subprocesses for convenience / demonstration purposes. You would likely not want to do this in production as it would limit concurrency to available system resources. For more information on how to deploy your bots using VMs, refer to the Pipecat documentation.*
### Build the docker image
`docker build -t tag:project .`
### Launch the fly project
`mv fly.example.toml fly.toml`
`fly launch` (using the included fly.toml)
### Setup your secrets on Fly
Set the necessary secrets (found in `env.example`)
`fly secrets set DAILY_API_KEY=... OPENAI_API_KEY=... ELEVENLABS_API_KEY=... ELEVENLABS_VOICE_ID=...`
If you're using Twilio as a number vendor:
`fly secrets set TWILIO_ACCOUNT_SID=... TWILIO_AUTH_TOKEN=...`
### Deploy!
`fly deploy`
## Need to do something more advanced?
This demo covers the basics of bot telephony. If you want to know more about working with PSTN / SIP, please ping us on [Discord](https://discord.gg/pipecat).

View File

@@ -0,0 +1,195 @@
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import LLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str, callId: str, callDomain: str):
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=diallin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
content = f"""
You are a delivery service customer support specialist supporting customers with their orders.
Begin with: "Hello, this is Hailey from customer support. What can I help you with today?"
"""
messages = [
{
"role": "system",
"content": content,
},
]
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "transfer_call",
"description": "Transfer the call to a person. This function is used to connect the call to a real person. Examples of real people are: managers, supervisors, or other customer support specialists. Any person is okay as long as they are not a bot.",
"parameters": {
"type": "object",
"properties": {
"call_id": {
"type": "string",
"description": "This is always {callId}.",
},
"summary": {
"type": "string",
"description": """
Provide a concise summary in 3-5 sentences. Highlight any important details or unusual aspects of the conversation.
""",
},
},
},
},
)
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
async def default_transfer_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
logger.debug(f"default_transfer_call: {function_name} {tool_call_id} {args}")
await result_callback(
{
"transfer_call": False,
"reason": "To transfer call calls, please dial in to the room using a phone or a SIP client.",
}
)
llm.register_function(
function_name="transfer_call",
callback=default_transfer_call,
)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(_, sip_endpoint):
logger.info(f"on_dialin_ready: {sip_endpoint}")
@transport.event_handler("on_dialin_connected")
async def on_dialin_connected(transport, event):
logger.info(f"on_dialin_connected: {event}")
sip_session_id = event["sessionId"]
async def transfer_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
logger.debug(f"transfer_call: {function_name} {tool_call_id} {args}")
# sip_url = "sip:your_user_name@sip.linphone.org"
sip_url = (
f"sip:your_username@dailyco.sip.twilio.com?x-daily_id={room_url.split('/')[-1]}"
)
try:
await transport.sip_refer(
settings={
"sessionId": sip_session_id,
"toEndPoint": sip_url,
}
)
except Exception as e:
logger.error(f"An error occurred during SIP refer: {e}")
await result_callback({"transfer_call": False})
await result_callback({"transfer_call": True})
llm.register_function(
function_name="transfer_call",
callback=transfer_call,
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-i", type=str, help="Call ID")
parser.add_argument("-d", type=str, help="Call Domain")
config = parser.parse_args()
asyncio.run(main(config.u, config.t, config.i, config.d))

View File

@@ -73,29 +73,24 @@ action using the Twilio Client library.
"""
async def _create_daily_room(
room_url, callId, callDomain=None, dialoutNumber=None, vendor="daily", detect_voicemail=False
):
async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
if not room_url:
# Create base properties with SIP settings
properties = DailyRoomProperties(
sip=DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
params = DailyRoomParams(
properties=DailyRoomProperties(
# Note: these are the default values, except for the display name
sip=DailyRoomSipParams(
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
)
)
)
# Only enable dialout if dialoutNumber is provided
if dialoutNumber:
properties.enable_dialout = True
params = DailyRoomParams(properties=properties)
print(f"Creating new room...")
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
else:
# Check passed room URL exist (we assume that it already has a sip set up!)
try:
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
@@ -111,9 +106,7 @@ async def _create_daily_room(
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
@@ -184,18 +177,13 @@ async def daily_start_bot(request: Request) -> JSONResponse:
if "test" in data:
# Pass through any webhook checks
return JSONResponse({"test": True})
detect_voicemail = data.get("detectVoicemail", False)
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
dialoutNumber = data.get("dialoutNumber", None)
except Exception:
raise HTTPException(
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
)
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
room: DailyRoomObject = await _create_daily_room(
room_url, callId, callDomain, dialoutNumber, "daily", detect_voicemail
)
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})

View File

@@ -8,6 +8,7 @@ from loguru import logger
from twilio.rest import Client
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -86,7 +87,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):

View File

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

@@ -16,7 +16,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.audio import LocalAudioTransport
load_dotenv(override=True)
@@ -25,7 +26,7 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
transport = LocalAudioTransport(LocalAudioTransportParams(audio_out_enabled=True))
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -40,7 +41,7 @@ async def main():
await asyncio.sleep(1)
await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()])
runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True)
runner = PipelineRunner()
await asyncio.gather(runner.run(task), say_something())

View File

@@ -13,7 +13,7 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -53,7 +53,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
await runner.run(task)

View File

@@ -1,64 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.google import GoogleImageGenService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Show a still frame image",
DailyParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = GoogleImageGenService(
api_key=os.getenv("GOOGLE_API_KEY"),
)
runner = PipelineRunner()
task = PipelineTask(
Pipeline([imagegen, transport.output()]), PipelineParams(enable_metrics=True)
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
await task.queue_frame(TextFrame("a dog in the style of picasso"))
await task.queue_frame(TextFrame("a fish in the style of picasso"))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -51,6 +51,7 @@ async def main():
)
elevenlabs_tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

View File

@@ -14,7 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, MetricsFrame
from pipecat.frames.frames import EndFrame, Frame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
@@ -38,8 +38,6 @@ logger.add(sys.stderr, level="DEBUG")
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, MetricsFrame):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
@@ -117,7 +115,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -15,19 +15,13 @@ from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
TextFrame,
)
from pipecat.frames.frames import EndFrame, Frame, OutputImageRawFrame, SystemFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -51,7 +45,7 @@ class ImageSyncAggregator(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, BotStartedSpeakingFrame):
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
@@ -59,8 +53,7 @@ class ImageSyncAggregator(FrameProcessor):
format=self._speaking_image_format,
)
)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(frame)
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
@@ -68,8 +61,8 @@ class ImageSyncAggregator(FrameProcessor):
format=self._waiting_image_format,
)
)
await self.push_frame(frame)
else:
await self.push_frame(frame)
async def main():
@@ -91,7 +84,7 @@ async def main():
),
)
tts = CartesiaTTSService(
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -116,24 +109,16 @@ async def main():
pipeline = Pipeline(
[
transport.input(),
image_sync_aggregator,
context_aggregator.user(),
llm,
tts,
image_sync_aggregator,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
@@ -143,7 +128,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -13,6 +13,7 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -93,7 +94,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -91,7 +92,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -95,7 +96,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -19,7 +19,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -124,7 +124,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -16,6 +16,7 @@ from runner import configure
from pipecat.frames.frames import (
BotInterruptionFrame,
EndFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
@@ -105,7 +106,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -9,12 +9,12 @@ import os
import sys
import aiohttp
from deepgram import LiveOptions
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -45,23 +45,7 @@ async def main():
),
)
# stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
# url=deepgram_url,
live_options=LiveOptions(
encoding="linear16",
language="en-US",
model="nova-3",
channels=1,
interim_results=True,
# smart_format=smart_format,
# endpointing=endpointing,
vad_events=True,
diarize=True,
filler_words=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
@@ -107,7 +91,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -91,7 +92,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,12 +14,14 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.playht import PlayHTHttpTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -92,7 +94,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -94,7 +95,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -100,7 +101,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,11 +14,12 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService, OpenAISTTService, OpenAITTSService
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -38,21 +39,12 @@ async def main():
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
# You can use the OpenAI compatible API like Groq.
# stt = OpenAISTTService(
# base_url="https://api.groq.com/openai/v1",
# api_key="gsk_***",
# model="whisper-large-v3",
# )
stt = OpenAISTTService(api_key=os.getenv("OPENAI_API_KEY"), model="whisper-1")
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="alloy")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
@@ -70,7 +62,6 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
@@ -98,7 +89,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -15,6 +15,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -98,7 +99,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -92,7 +93,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -98,7 +99,7 @@ async def main():
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -38,6 +39,7 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
@@ -88,7 +90,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -104,7 +105,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -98,7 +99,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,11 +14,14 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.google import GoogleLLMService, GoogleSTTService, GoogleTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transcriptions.language import Language
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -38,22 +41,21 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = GoogleSTTService(
params=GoogleSTTService.InputParams(languages=Language.EN_US),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = GoogleTTSService(
voice_id="en-US-Journey-F",
params=GoogleTTSService.InputParams(language=Language.EN_US),
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -96,7 +98,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -97,7 +98,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.filters.krisp_filter import KrispFilter
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -92,7 +93,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,12 +14,13 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.rime import RimeTTSService
from pipecat.services.rime import RimeHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -44,9 +45,10 @@ async def main():
),
)
tts = RimeTTSService(
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
params=RimeHttpTTSService.InputParams(reduce_latency=True),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
@@ -91,7 +93,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -84,7 +85,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -17,6 +17,7 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
Frame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
@@ -216,7 +217,11 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm = GoogleLLMService(
model="gemini-1.5-flash-latest",
# model="gemini-exp-1114",
api_key=os.getenv("GOOGLE_API_KEY"),
)
messages = [
{
@@ -266,7 +271,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -91,7 +92,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -48,6 +48,7 @@ async def main():
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts2 = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="jBpfuIE2acCO8z3wKNLl",
)

View File

@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -61,6 +61,7 @@ async def main():
"Test",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -77,9 +78,7 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
task = PipelineTask(pipeline)
await runner.run(task)

View File

@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
@@ -62,7 +62,7 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True)
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
)
tk_transport = TkLocalTransport(
@@ -82,9 +82,7 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
task = PipelineTask(pipeline)
async def run_tk():
while not task.has_finished():

View File

@@ -7,7 +7,6 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -33,7 +32,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: Optional[str] = None):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -7,7 +7,6 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -33,7 +32,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: Optional[str] = None):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id
@@ -73,7 +72,9 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
google = GoogleLLMService(
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -7,7 +7,6 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -33,7 +32,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: Optional[str] = None):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -7,7 +7,6 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -33,7 +32,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: Optional[str] = None):
def __init__(self, participant_id: str | None = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -16,7 +16,8 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.whisper import WhisperSTTService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.audio import LocalAudioTransport
load_dotenv(override=True)
@@ -33,7 +34,7 @@ class TranscriptionLogger(FrameProcessor):
async def main():
transport = LocalAudioTransport(LocalAudioTransportParams(audio_in_enabled=True))
transport = LocalAudioTransport(TransportParams(audio_in_enabled=True))
stt = WhisperSTTService()
@@ -43,7 +44,7 @@ async def main():
task = PipelineTask(pipeline)
runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True)
runner = PipelineRunner()
await runner.run(task)

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -30,8 +29,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -62,7 +62,11 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm = GoogleLLMService(
model="gemini-1.5-flash-latest",
# model="gemini-exp-1114",
api_key=os.getenv("GOOGLE_API_KEY"),
)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)

View File

@@ -15,12 +15,11 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.groq import GroqLLMService, GroqSTTService
from pipecat.services.groq import GroqLLMService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
@@ -50,20 +52,20 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = GroqSTTService(api_key=os.getenv("GROQ_API_KEY"), model="distil-whisper-large-v3-en")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
llm = GroqLLMService(
api_key=os.getenv("GROQ_API_KEY"), model="llama3-groq-70b-8192-tool-use-preview"
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
@@ -105,7 +107,6 @@ async def main():
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
@@ -93,7 +95,7 @@ async def main():
messages = [
{
"role": "system",
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
You have one functions available:

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
@@ -93,7 +95,7 @@ async def main():
messages = [
{
"role": "system",
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
You have one functions available:

View File

@@ -15,7 +15,6 @@ from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -31,8 +30,11 @@ logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
# note: we can't push a frame to the LLM here. the bot
# can interrupt itself and/or cause audio overlapping glitches.
# possible question for Aleix and Chad about what the right way
# to trigger speech is, now, with the new queues/async/sync refactors.
# await llm.push_frame(TextFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")

View File

@@ -1,106 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""This example demonstrates using the Perplexity API as a drop-in replacement for OpenAI.
Note that while this file is in the function-calling examples, Perplexity's API does not
currently support function calling. The example shows basic chat completion functionality
using Perplexity's API while maintaining compatibility with the OpenAI interface.
"""
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMService
from pipecat.services.perplexity import PerplexityLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = PerplexityLLMService(api_key=os.getenv("PERPLEXITY_API_KEY"), model="sonar")
messages = [
{
"role": "user",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -14,7 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -63,36 +63,16 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline(
[

View File

@@ -51,6 +51,8 @@ async def main():
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
audio_sample_rate=24000,
audio_channels=1,
),
)

View File

@@ -80,7 +80,9 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -177,7 +177,9 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -237,7 +237,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)

View File

@@ -14,6 +14,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -88,10 +89,6 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
@@ -123,7 +120,7 @@ async def main():
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -104,11 +104,8 @@ async def main():
)
# This processor keeps the last context and will let it through once the
# notifier is woken up. We start with the gate open because we send an
# initial context frame to start the conversation.
gated_context_aggregator = GatedOpenAILLMContextAggregator(
notifier=notifier, start_open=True
)
# notifier is woken up.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):

View File

@@ -12,7 +12,6 @@ import time
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -20,8 +19,6 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
@@ -29,7 +26,6 @@ from pipecat.frames.frames import (
SystemFrame,
TextFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -121,21 +117,18 @@ class CompletenessCheck(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame) and frame.text == "YES":
logger.debug("Completeness check YES")
await self.push_frame(UserStoppedSpeakingFrame())
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("Completeness check NO")
else:
await self.push_frame(frame, direction)
class OutputGate(FrameProcessor):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._gate_open = start_open
self._gate_open = False
self._frames_buffer = []
self._notifier = notifier
@@ -160,11 +153,6 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -178,10 +166,11 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
@@ -195,16 +184,6 @@ class OutputGate(FrameProcessor):
break
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
@@ -235,34 +214,6 @@ async def main():
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
@@ -271,7 +222,7 @@ async def main():
},
]
context = OpenAILLMContext(messages, tools)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
@@ -299,9 +250,7 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
bot_output_gate = OutputGate(notifier=notifier)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
@@ -312,8 +261,6 @@ async def main():
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
pipeline = Pipeline(

View File

@@ -12,7 +12,6 @@ import time
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -20,8 +19,6 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
@@ -29,7 +26,6 @@ from pipecat.frames.frames import (
SystemFrame,
TextFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -105,12 +101,12 @@ HIGH PRIORITY SIGNALS:
Examples:
# Complete Wh-question
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "What's the fastest way to learn Spanish"}]
Output: YES
# Complete Yes/No question despite STT error
[{"role": "assistant", "content": "I know about planets."},
[{"role": "assistant", "content": "I know about planets."},
{"role": "user", "content": "Is is Jupiter the biggest planet"}]
Output: YES
@@ -122,12 +118,12 @@ Output: YES
Examples:
# Direct instruction
[{"role": "assistant", "content": "I can explain many topics."},
[{"role": "assistant", "content": "I can explain many topics."},
{"role": "user", "content": "Tell me about black holes"}]
Output: YES
# Action demand
[{"role": "assistant", "content": "I can help with math."},
[{"role": "assistant", "content": "I can help with math."},
{"role": "user", "content": "Solve this equation x plus 5 equals 12"}]
Output: YES
@@ -138,12 +134,12 @@ Output: YES
Examples:
# Specific answer
[{"role": "assistant", "content": "What's your favorite color?"},
[{"role": "assistant", "content": "What's your favorite color?"},
{"role": "user", "content": "I really like blue"}]
Output: YES
# Option selection
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
{"role": "user", "content": "Morning"}]
Output: YES
@@ -157,17 +153,17 @@ MEDIUM PRIORITY SIGNALS:
Examples:
# Self-correction reaching completion
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}]
Output: YES
# Topic change with complete thought
[{"role": "assistant", "content": "The weather is nice today."},
[{"role": "assistant", "content": "The weather is nice today."},
{"role": "user", "content": "Actually can you tell me who invented the telephone"}]
Output: YES
# Mid-sentence completion
[{"role": "assistant", "content": "Hello I'm ready."},
[{"role": "assistant", "content": "Hello I'm ready."},
{"role": "user", "content": "What's the capital of? France"}]
Output: YES
@@ -179,12 +175,12 @@ Output: YES
Examples:
# Acknowledgment
[{"role": "assistant", "content": "Should we talk about history?"},
[{"role": "assistant", "content": "Should we talk about history?"},
{"role": "user", "content": "Sure"}]
Output: YES
# Disagreement with completion
[{"role": "assistant", "content": "Is that what you meant?"},
[{"role": "assistant", "content": "Is that what you meant?"},
{"role": "user", "content": "No not really"}]
Output: YES
@@ -198,12 +194,12 @@ LOW PRIORITY SIGNALS:
Examples:
# Word repetition but complete
[{"role": "assistant", "content": "I can help with that."},
[{"role": "assistant", "content": "I can help with that."},
{"role": "user", "content": "What what is the time right now"}]
Output: YES
# Missing punctuation but complete
[{"role": "assistant", "content": "I can explain that."},
[{"role": "assistant", "content": "I can explain that."},
{"role": "user", "content": "Please tell me how computers work"}]
Output: YES
@@ -215,12 +211,12 @@ Output: YES
Examples:
# Filler words but complete
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Um uh how do airplanes fly"}]
Output: YES
# Thinking pause but incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "Well um I want to know about the"}]
Output: NO
@@ -245,17 +241,17 @@ DECISION RULES:
Examples:
# Incomplete despite corrections
[{"role": "assistant", "content": "What would you like to know about?"},
[{"role": "assistant", "content": "What would you like to know about?"},
{"role": "user", "content": "Can you tell me about"}]
Output: NO
# Complete despite multiple artifacts
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "How do you I mean what's the best way to learn programming"}]
Output: YES
# Trailing off incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "I was wondering if you could tell me why"}]
Output: NO
"""
@@ -332,14 +328,12 @@ class CompletenessCheck(FrameProcessor):
await self._notifier.notify()
elif isinstance(frame, TextFrame) and frame.text == "NO":
logger.debug("!!! Completeness check NO")
else:
await self.push_frame(frame, direction)
class OutputGate(FrameProcessor):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._gate_open = start_open
self._gate_open = False
self._frames_buffer = []
self._notifier = notifier
@@ -364,11 +358,6 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -382,10 +371,11 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
@@ -399,16 +389,6 @@ class OutputGate(FrameProcessor):
break
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
@@ -445,34 +425,6 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
@@ -481,7 +433,7 @@ async def main():
},
]
context = OpenAILLMContext(messages, tools)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
@@ -508,9 +460,7 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
bot_output_gate = OutputGate(notifier=notifier)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
@@ -521,8 +471,6 @@ async def main():
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
pipeline = Pipeline(

View File

@@ -20,8 +20,6 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -46,7 +44,9 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.sync.event_notifier import EventNotifier
@@ -57,9 +57,13 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
TRANSCRIBER_MODEL = "gemini-2.0-flash-001"
CLASSIFIER_MODEL = "gemini-2.0-flash-001"
CONVERSATION_MODEL = "gemini-2.0-flash-001"
# TRANSCRIBER_MODEL = "gemini-1.5-flash-latest"
# CLASSIFIER_MODEL = "gemini-1.5-flash-latest"
# CONVERSATION_MODEL = "gemini-1.5-flash-latest"
TRANSCRIBER_MODEL = "gemini-2.0-flash-exp"
CLASSIFIER_MODEL = "gemini-2.0-flash-exp"
CONVERSATION_MODEL = "gemini-2.0-flash-exp"
transcriber_system_instruction = """You are an audio transcriber. You are receiving audio from a user. Your job is to
transcribe the input audio to text exactly as it was said by the user.
@@ -436,11 +440,11 @@ class CompletenessCheck(FrameProcessor):
if isinstance(frame, UserStartedSpeakingFrame):
if self._idle_task:
await self.cancel_task(self._idle_task)
self._idle_task.cancel()
elif isinstance(frame, TextFrame) and frame.text.startswith("YES"):
logger.debug("Completeness check YES")
if self._idle_task:
await self.cancel_task(self._idle_task)
self._idle_task.cancel()
await self.push_frame(UserStoppedSpeakingFrame())
await self._audio_accumulator.reset()
await self._notifier.notify()
@@ -453,9 +457,7 @@ class CompletenessCheck(FrameProcessor):
else:
# logger.debug("!!! CompletenessCheck idle wait START")
self._wakeup_time = time.time() + self.wait_time
self._idle_task = self.create_task(self._idle_task_handler())
else:
await self.push_frame(frame, direction)
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _idle_task_handler(self):
try:
@@ -497,7 +499,7 @@ class UserAggregatorBuffer(LLMResponseAggregator):
if isinstance(frame, UserStartedSpeakingFrame):
self._transcription = ""
async def push_aggregation(self):
async def _push_aggregation(self):
if self._aggregation:
self._transcription = self._aggregation
self._aggregation = ""
@@ -577,11 +579,6 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -602,10 +599,11 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
await self.cancel_task(self._gate_task)
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
@@ -642,6 +640,7 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_sample_rate=16000,
),
)
@@ -679,6 +678,12 @@ async def main():
context = OpenAILLMContext()
context_aggregator = conversation_llm.create_context_aggregator(context)
# We have instructed the LLM to return 'True' if it thinks the user
# completed a sentence. So, if it's 'True' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "True"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
@@ -695,6 +700,14 @@ async def main():
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
)
conversation_audio_context_assembler = ConversationAudioContextAssembler(context=context)
user_aggregator_buffer = UserAggregatorBuffer()

View File

@@ -61,11 +61,9 @@ async def main():
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Configure the mute processor with both strategies
stt_mute_processor = STTMuteFilter(
stt_service=stt,
config=STTMuteConfig(
strategies={
STTMuteStrategy.MUTE_UNTIL_FIRST_BOT_COMPLETE,
STTMuteStrategy.FUNCTION_CALL,
}
strategies={STTMuteStrategy.FIRST_SPEECH, STTMuteStrategy.FUNCTION_CALL}
),
)

View File

@@ -143,14 +143,7 @@ class InputTranscriptionContextFilter(FrameProcessor):
return
try:
# Make sure we're working with a GoogleLLMContext
context = GoogleLLMContext.upgrade_to_google(frame.context)
message = context.messages[-1]
if not isinstance(message, glm.Content):
logger.error(f"Expected glm.Content, got {type(message)}")
return
message = frame.context.messages[-1]
last_part = message.parts[-1]
if not (
message.role == "user"
@@ -219,7 +212,7 @@ class InputTranscriptionFrameEmitter(FrameProcessor):
elif isinstance(frame, LLMFullResponseEndFrame):
await self.push_frame(LLMDemoTranscriptionFrame(text=self._aggregation.strip()))
self._aggregation = ""
else:
elif isinstance(frame, MetricsFrame):
await self.push_frame(frame, direction)
@@ -299,7 +292,7 @@ async def main():
conversation_llm = GoogleLLMService(
name="Conversation",
model="gemini-2.0-flash-001",
model="gemini-1.5-flash-latest",
# model="gemini-exp-1121",
api_key=os.getenv("GOOGLE_API_KEY"),
# we can give the GoogleLLMService a system instruction to use directly
@@ -310,7 +303,7 @@ async def main():
input_transcription_llm = GoogleLLMService(
name="Transcription",
model="gemini-2.0-flash-001",
model="gemini-1.5-flash-latest",
# model="gemini-exp-1121",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=transcriber_system_message,

View File

@@ -15,7 +15,6 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -37,6 +36,8 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,
@@ -70,21 +71,6 @@ async def main():
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frames(
[
LLMMessagesAppendFrame(
messages=[
{
"role": "assistant",
"content": "Greet the user.",
}
]
)
]
)
runner = PipelineRunner()
await runner.run(task)

View File

@@ -37,6 +37,8 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

Some files were not shown because too many files have changed in this diff Show More