Compare commits
118 Commits
hush/aggre
...
v0.0.89
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b2379cba8 | ||
|
|
92087bdfa8 | ||
|
|
617919ac09 | ||
|
|
0669daec3d | ||
|
|
7c15a8c800 | ||
|
|
d9aef5f916 | ||
|
|
91ae3f8a9b | ||
|
|
36da623352 | ||
|
|
31b9087ea6 | ||
|
|
1851fed22e | ||
|
|
eddce460da | ||
|
|
da4f30cb6d | ||
|
|
250cf2d8f1 | ||
|
|
7bbdb4f991 | ||
|
|
051c4782fb | ||
|
|
b1ccec74b2 | ||
|
|
92bf0d9eda | ||
|
|
f985550441 | ||
|
|
de8ee96927 | ||
|
|
2576d0f340 | ||
|
|
f38f4711ac | ||
|
|
c2f3ddd329 | ||
|
|
73ffe96228 | ||
|
|
bd13a80da7 | ||
|
|
312959f97e | ||
|
|
fe168e3c68 | ||
|
|
28929a47f7 | ||
|
|
03f5defbc3 | ||
|
|
b216648315 | ||
|
|
084b133a01 | ||
|
|
e589876176 | ||
|
|
a826313bf9 | ||
|
|
49f44aa7c8 | ||
|
|
64ceef9cf0 | ||
|
|
cd6567c1f1 | ||
|
|
ac67ca1555 | ||
|
|
8d38994756 | ||
|
|
60604a9449 | ||
|
|
4abe4a6253 | ||
|
|
4c054af17b | ||
|
|
dcba940d42 | ||
|
|
ad2adb0c58 | ||
|
|
76923010b5 | ||
|
|
1b511557b2 | ||
|
|
fdadb12933 | ||
|
|
f1bbb7ba22 | ||
|
|
c1492c5275 | ||
|
|
4ffdabcfde | ||
|
|
b489de2fc3 | ||
|
|
d9656cbb1a | ||
|
|
05fb223985 | ||
|
|
62a5f07ad2 | ||
|
|
b669e3a481 | ||
|
|
99f1041a47 | ||
|
|
37b1345bfa | ||
|
|
8994ac17eb | ||
|
|
63bc825008 | ||
|
|
e7ffde1c4c | ||
|
|
1c88565725 | ||
|
|
07a6c2fb0e | ||
|
|
e99f3bf75a | ||
|
|
f09d780413 | ||
|
|
e370d23374 | ||
|
|
b68ec14146 | ||
|
|
c567fd71b1 | ||
|
|
2ca1b2d6f8 | ||
|
|
04041a9a9a | ||
|
|
6c498dc70f | ||
|
|
32b07c1720 | ||
|
|
ad507ce23d | ||
|
|
be562cedfc | ||
|
|
089e703e1f | ||
|
|
4dc1e15a99 | ||
|
|
c7dc2e886f | ||
|
|
11bc4ea854 | ||
|
|
029d76033d | ||
|
|
924d7dea9a | ||
|
|
244e94f3ce | ||
|
|
af1f51d49e | ||
|
|
9ba3c168b8 | ||
|
|
e6ee8f7a16 | ||
|
|
2ea2bd99e0 | ||
|
|
0c2ced7c52 | ||
|
|
fb160646b8 | ||
|
|
89fed57af2 | ||
|
|
feae3b6d2d | ||
|
|
92d3be8975 | ||
|
|
0f53e1db2c | ||
|
|
d398e8cc10 | ||
|
|
e5f263d380 | ||
|
|
3a4c303c54 | ||
|
|
54a1ef47d0 | ||
|
|
149ffa4f3c | ||
|
|
e5465034d9 | ||
|
|
568c7c782d | ||
|
|
9851334221 | ||
|
|
e79c4fc99d | ||
|
|
55c321f4ff | ||
|
|
a14a53a005 | ||
|
|
a71f937e8f | ||
|
|
032032df65 | ||
|
|
d0178edad0 | ||
|
|
795c5e55d9 | ||
|
|
8f8d8ae0d8 | ||
|
|
741f192d04 | ||
|
|
a5595b82ea | ||
|
|
4d1915eb41 | ||
|
|
b3a84fc772 | ||
|
|
403d22e62c | ||
|
|
ee00ee5c57 | ||
|
|
f53fd880dc | ||
|
|
de3461e4cc | ||
|
|
7bafc3a1bb | ||
|
|
22ef61fe8d | ||
|
|
7078fb53bd | ||
|
|
33447ad6f2 | ||
|
|
6faa50ae5b | ||
|
|
889dc19a27 |
22
.github/workflows/publish.yaml
vendored
22
.github/workflows/publish.yaml
vendored
@@ -5,25 +5,25 @@ on:
|
||||
inputs:
|
||||
gitref:
|
||||
type: string
|
||||
description: "what git tag to build (e.g. v0.0.74)"
|
||||
description: 'what git tag to build (e.g. v0.0.74)'
|
||||
required: true
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: "Build and upload wheels"
|
||||
name: 'Build and upload wheels'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ github.event.inputs.gitref }}
|
||||
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v3
|
||||
with:
|
||||
version: "latest"
|
||||
version: 'latest'
|
||||
- name: Set up Python
|
||||
run: uv python install 3.10
|
||||
run: uv python install 3.12
|
||||
- name: Install development dependencies
|
||||
run: uv sync --group dev
|
||||
- name: Build project
|
||||
@@ -35,9 +35,9 @@ jobs:
|
||||
path: ./dist
|
||||
|
||||
publish-to-pypi:
|
||||
name: "Publish to PyPI"
|
||||
name: 'Publish to PyPI'
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ build ]
|
||||
needs: [build]
|
||||
environment:
|
||||
name: pypi
|
||||
url: https://pypi.org/p/pipecat-ai
|
||||
@@ -56,12 +56,12 @@ jobs:
|
||||
print-hash: true
|
||||
|
||||
publish-to-test-pypi:
|
||||
name: "Publish to Test PyPI"
|
||||
name: 'Publish to Test PyPI'
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ build ]
|
||||
needs: [build]
|
||||
environment:
|
||||
name: testpypi
|
||||
url: https://pypi.org/p/pipecat-ai
|
||||
url: https://test.pypi.org/p/pipecat-ai
|
||||
permissions:
|
||||
id-token: write
|
||||
steps:
|
||||
@@ -70,7 +70,7 @@ jobs:
|
||||
with:
|
||||
name: wheels
|
||||
path: ./dist
|
||||
- name: Publish to PyPI
|
||||
- name: Publish to Test PyPI
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
verbose: true
|
||||
|
||||
12
.github/workflows/publish_test.yaml
vendored
12
.github/workflows/publish_test.yaml
vendored
@@ -4,7 +4,7 @@ on: workflow_dispatch
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: "Build and upload wheels"
|
||||
name: 'Build and upload wheels'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
@@ -15,9 +15,9 @@ jobs:
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v3
|
||||
with:
|
||||
version: "latest"
|
||||
version: 'latest'
|
||||
- name: Set up Python
|
||||
run: uv python install 3.10
|
||||
run: uv python install 3.12
|
||||
- name: Install development dependencies
|
||||
run: uv sync --group dev
|
||||
- name: Build project
|
||||
@@ -29,12 +29,12 @@ jobs:
|
||||
path: ./dist
|
||||
|
||||
publish-to-test-pypi:
|
||||
name: "Publish to Test PyPI"
|
||||
name: 'Publish to Test PyPI'
|
||||
runs-on: ubuntu-latest
|
||||
needs: [build]
|
||||
environment:
|
||||
name: testpypi
|
||||
url: https://pypi.org/p/pipecat-ai
|
||||
url: https://test.pypi.org/p/pipecat-ai
|
||||
permissions:
|
||||
id-token: write
|
||||
steps:
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
with:
|
||||
name: wheels
|
||||
path: ./dist
|
||||
- name: Publish to PyPI
|
||||
- name: Publish to Test PyPI
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
verbose: true
|
||||
|
||||
96
CHANGELOG.md
96
CHANGELOG.md
@@ -5,6 +5,102 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.0.89] - 2025-10-07
|
||||
|
||||
### Fixed
|
||||
|
||||
- Reverted a change introduced in 0.0.88 that was causing pipelines to be frozen
|
||||
when using interruption strategies and processors that block interruption
|
||||
frames (e.g. `STTMuteFilter`).
|
||||
|
||||
## [0.0.88] - 2025-10-07
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for Nano Banana models to `GoogleLLMService`. For example, you
|
||||
can now use the `gemini-2.5-flash-image` model to generate images.
|
||||
|
||||
- Added `HumeTTSService` for text-to-speech synthesis using Hume AI's expressive
|
||||
voice models. Provides high-quality, emotionally expressive speech synthesis
|
||||
with support for various voice models. Includes example in
|
||||
`examples/foundational/07ad-interruptible-hume.py`. Use with `uv pip install
|
||||
pipecat-ai[hume]`.
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated default `GoogleLLMService` model to `gemini-2.5-flash`.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- PlayHT is shutting down their API on December 31st, 2025. As a result,
|
||||
`PlayHTTTSService` and `PlayHTHttpTTSService` are deprecated and will be
|
||||
removed in a future version.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue with `AWSNovaSonicLLMService` where the client wouldn't
|
||||
connect due to a breaking change in the AWS dependency chain.
|
||||
|
||||
- `PermissionError` is now caught if NLTK's `punkt_tab` can't be downloaded.
|
||||
|
||||
- Fixed an issue that would cause wrong user/assistant context ordering when
|
||||
using interruption strategies.
|
||||
|
||||
- Fixed RTVI incoming message handling, broken in 0.0.87.
|
||||
|
||||
## [0.0.87] - 2025-10-02
|
||||
|
||||
### Added
|
||||
|
||||
- Added `WebsocketSTTService` base class for websocket-based STT services.
|
||||
Combines STT functionality with websocket connectivity, providing automatic
|
||||
error handling and reconnection capabilities with exponential backoff.
|
||||
|
||||
- Added `DeepgramFluxSTTService` for real-time speech recognition using
|
||||
Deepgram's Flux WebSocket API. Flux understands conversational flow and
|
||||
automatically handles turn-taking.
|
||||
|
||||
- Added RTVI messages for user/bot audio levels and system logs.
|
||||
|
||||
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated the default model for `AnthropicLLMService` to
|
||||
`claude-sonnet-4-5-20250929`.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `DailyTransportMessageFrame` and `DailyTransportMessageUrgentFrame` are
|
||||
deprecated, use `DailyOutputTransportMessageFrame` and
|
||||
`DailyOutputTransportMessageUrgentFrame` respectively instead.
|
||||
|
||||
- `LiveKitTransportMessageFrame` and `LiveKitTransportMessageUrgentFrame` are
|
||||
deprecated, use `LiveKitOutputTransportMessageFrame` and
|
||||
`LiveKitOutputTransportMessageUrgentFrame` respectively instead.
|
||||
|
||||
- `TransportMessageFrame` and `TransportMessageUrgentFrame` are deprecated, use
|
||||
`OutputTransportMessageFrame` and `OutputTransportMessageUrgentFrame`
|
||||
respectively instead.
|
||||
|
||||
- `InputTransportMessageUrgentFrame` is deprecated, use
|
||||
`InputTransportMessageFrame` instead.
|
||||
|
||||
- `DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a
|
||||
future version. Instead, create your own custom frame and handle it in the
|
||||
`@transport.output().event_handler("on_after_push_frame")` event handler or a
|
||||
custom processor.
|
||||
|
||||
## Fixed
|
||||
|
||||
- Fixed an issue in `AWSBedrockLLMService` where timeout exceptions weren't
|
||||
being detected.
|
||||
|
||||
- Fixed a `PipelineTask` issue that could prevent the application to exit if
|
||||
`task.cancel()` was called when the task was already finished.
|
||||
|
||||
- Fixed an issue where local SmartTurn was not being ran in a separate thread.
|
||||
|
||||
## [0.0.86] - 2025-09-24
|
||||
|
||||
### Added
|
||||
|
||||
336
COMMUNITY_INTEGRATIONS.md
Normal file
336
COMMUNITY_INTEGRATIONS.md
Normal file
@@ -0,0 +1,336 @@
|
||||
# Community Integrations Guide
|
||||
|
||||
Pipecat welcomes community-maintained integrations! As our ecosystem grows, we've established a process for any developer to create and maintain their own service integrations while ensuring discoverability for the Pipecat community.
|
||||
|
||||
## Overview
|
||||
|
||||
**What we support:** Community-maintained integrations that live in separate repositories and are maintained by their authors.
|
||||
|
||||
**What we don't do:** The Pipecat team does not code review, test, or maintain community integrations. We provide guidance and list approved integrations for discoverability.
|
||||
|
||||
**Why this approach:** This allows the community to move quickly while keeping the Pipecat core team focused on maintaining the framework itself.
|
||||
|
||||
## Submitting your Integration
|
||||
|
||||
To be listed as an official community integration, follow these steps:
|
||||
|
||||
### Step 1: Build Your Integration
|
||||
|
||||
Create your integration following the patterns and examples shown in the "Integration Patterns and Examples" section below.
|
||||
|
||||
### Step 2: Set Up Your Repository
|
||||
|
||||
Your repository must contain these components:
|
||||
|
||||
- **Source code** - Complete implementation following Pipecat patterns
|
||||
- **Foundational example** - Single file example showing basic usage (see [Pipecat examples](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational))
|
||||
- **README.md** - Must include:
|
||||
|
||||
- Introduction and explanation of your integration
|
||||
- Installation instructions
|
||||
- Usage instructions with Pipecat Pipeline
|
||||
- How to run your example
|
||||
- Pipecat version compatibility (e.g., "Tested with Pipecat v0.0.86")
|
||||
- Company attribution: If you work for the company providing the service, please mention this in your README. This helps build confidence that the integration will be actively maintained.
|
||||
|
||||
- **LICENSE** - Permissive license (BSD-2 like Pipecat, or equivalent open source terms)
|
||||
- **Code documentation** - Source code with docstrings (we recommend following [Pipecat's docstring conventions](https://github.com/pipecat-ai/pipecat/blob/main/CONTRIBUTING.md#docstring-conventions))
|
||||
- **Changelog** - Maintain a changelog for version updates
|
||||
|
||||
### Step 3: Join Discord
|
||||
|
||||
Join our Discord: https://discord.gg/pipecat
|
||||
|
||||
### Step 4: Submit for Listing
|
||||
|
||||
Submit a pull request to add your integration to our [Community Integrations documentation page](https://docs.pipecat.ai/server/services/community-integrations).
|
||||
|
||||
**To submit:**
|
||||
|
||||
1. Fork the [Pipecat docs repository](https://github.com/pipecat-ai/docs)
|
||||
2. Edit the file `server/services/community-integrations.mdx`
|
||||
3. Add your integration to the appropriate service category table with:
|
||||
- Service name
|
||||
- Link to your repository
|
||||
- Maintainer GitHub username(s)
|
||||
4. Include a link to your demo video (approx 30-60 seconds) in your PR description showing:
|
||||
- Core functionality of your integration
|
||||
- Handling of an interruption (if applicable to service type)
|
||||
5. Submit your pull request
|
||||
|
||||
Once your PR is submitted, post in the `#community-integrations` Discord channel to let us know.
|
||||
|
||||
## Integration Patterns and Examples
|
||||
|
||||
### STT (Speech-to-Text) Services
|
||||
|
||||
#### Websocket-based Services
|
||||
|
||||
**Base class:** `STTService`
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [DeepgramSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py)
|
||||
- [SpeechmaticsSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/speechmatics/stt.py)
|
||||
|
||||
#### File-based Services
|
||||
|
||||
**Base class:** `SegmentedSTTService`
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [RivaSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/riva/stt.py)
|
||||
- [FalSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/stt.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- STT services should push `InterimTranscriptionFrames` and `TranscriptionFrames`
|
||||
- If confidence values are available, filter for values >50% confidence
|
||||
|
||||
### LLM (Large Language Model) Services
|
||||
|
||||
#### OpenAI-Compatible Services
|
||||
|
||||
**Base class:** `OpenAILLMService`
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [AzureLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/azure/llm.py)
|
||||
- [GrokLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/grok/llm.py) - Shows overriding the base class where needed
|
||||
|
||||
#### Non-OpenAI Compatible Services
|
||||
|
||||
**Requires:** Full implementation
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [AnthropicLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/anthropic/llm.py)
|
||||
- [GoogleLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/llm.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- **Frame sequence:** Output must follow this frame sequence pattern:
|
||||
|
||||
- `LLMFullResponseStartFrame` - Signals the start of an LLM response
|
||||
- `LLMTextFrame` - Contains LLM content, typically streamed as tokens
|
||||
- `LLMFullResponseEndFrame` - Signals the end of an LLM response
|
||||
|
||||
- **Context aggregation:** Implement context aggregation to collect user and assistant content:
|
||||
- Aggregators come in pairs with a `user()` instance and `assistant()` instance
|
||||
- Context must adhere to the `LLMContext` universal format
|
||||
- Aggregators should handle adding messages, function calls, and images to the context
|
||||
|
||||
### TTS (Text-to-Speech) Services
|
||||
|
||||
#### AudioContextWordTTSService
|
||||
|
||||
**Use for:** Websocket-based services supporting word/timestamp alignment
|
||||
|
||||
**Example:**
|
||||
|
||||
- [CartesiaTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/cartesia/tts.py)
|
||||
|
||||
#### InterruptibleTTSService
|
||||
|
||||
**Use for:** Websocket-based services without word/timestamp alignment, requiring disconnection on interruption
|
||||
|
||||
**Example:**
|
||||
|
||||
- [SarvamTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/sarvam/tts.py)
|
||||
|
||||
#### WordTTSService
|
||||
|
||||
**Use for:** HTTP-based services supporting word/timestamp alignment
|
||||
|
||||
**Example:**
|
||||
|
||||
- [ElevenLabsHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/tts.py)
|
||||
|
||||
#### TTSService
|
||||
|
||||
**Use for:** HTTP-based services without word/timestamp alignment
|
||||
|
||||
**Example:**
|
||||
|
||||
- [GoogleHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/tts.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- For websocket services, use asyncio WebSocket implementation (required for v13+ support)
|
||||
- Handle idle service timeouts with keepalives
|
||||
- TTSServices push both audio (`TTSRawAudioFrame`) and text (`TTSTextFrame`) frames
|
||||
|
||||
### Telephony Serializers
|
||||
|
||||
Pipecat supports telephony provider integration using websocket connections to exchange MediaStreams. These services use a FrameSerializer to serialize and deserialize inputs from the FastAPIWebsocketTransport.
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [Twilio](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/twilio.py)
|
||||
- [Telnyx](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/telnyx.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- Include hang-up functionality using the provider's native API, ideally using `aiohttp`
|
||||
- Support DTMF (dual-tone multi-frequency) events if the provider supports them:
|
||||
- Deserialize DTMF events from the provider's protocol to `InputDTMFFrame`
|
||||
- Use `KeypadEntry` enum for valid keypad entries (0-9, \*, #, A-D)
|
||||
- Handle invalid DTMF digits gracefully by returning `None`
|
||||
|
||||
### Image Generation Services
|
||||
|
||||
**Base class:** `ImageGenService`
|
||||
|
||||
**Examples:**
|
||||
|
||||
- [FalImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/image.py)
|
||||
- [GoogleImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/image.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- Must implement `run_image_gen` method returning an `AsyncGenerator`
|
||||
|
||||
### Vision Services
|
||||
|
||||
Vision services process images and provide analysis such as descriptions, object detection, or visual question answering.
|
||||
|
||||
**Base class:** `VisionService`
|
||||
|
||||
**Example:**
|
||||
|
||||
- [MoondreamVisionService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/moondream/vision.py)
|
||||
|
||||
#### Key requirements:
|
||||
|
||||
- Must implement `run_vision` method that takes an `LLMContext` and returns an `AsyncGenerator[Frame, None]`
|
||||
- The method processes the latest image in the context and yields frames with analysis results
|
||||
- Typically yields `TextFrame` objects containing descriptions or answers
|
||||
|
||||
## Implementation Guidelines
|
||||
|
||||
### Naming Conventions
|
||||
|
||||
- **STT:** `VendorSTTService`
|
||||
- **LLM:** `VendorLLMService`
|
||||
- **TTS:**
|
||||
- Websocket: `VendorTTSService`
|
||||
- HTTP: `VendorHttpTTSService`
|
||||
- **Image:** `VendorImageGenService`
|
||||
- **Vision:** `VendorVisionService`
|
||||
- **Telephony:** `VendorFrameSerializer`
|
||||
|
||||
### Metrics Support
|
||||
|
||||
Enable metrics in your service:
|
||||
|
||||
```python
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True, as this service supports metrics.
|
||||
"""
|
||||
return True
|
||||
```
|
||||
|
||||
### Dynamic Settings Updates
|
||||
|
||||
STT, LLM, and TTS services support `ServiceUpdateSettingsFrame` for dynamic configuration changes. The base STTService has an `_update_settings()` method that handles settings, and the private `_settings` `Dict` is used to store settings and provide access to the subclass.
|
||||
|
||||
```python
|
||||
async def set_language(self, language: Language):
|
||||
"""Set the recognition language and reconnect.
|
||||
|
||||
Args:
|
||||
language: The language to use for speech recognition.
|
||||
"""
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._settings["language"] = language
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
```
|
||||
|
||||
Note that, in this example, Deepgram requires the websocket connection be disconnected and reconnected to reinitialize the service with the new value. Consider if your service requires reconnection.
|
||||
|
||||
### Sample Rate Handling
|
||||
|
||||
Sample rates are set via PipelineParams and passed to each frame processor at initialization. The pattern is to _not_ set the sample rate value in the constructor of a given service. Instead, use the `start()` method to initialize sample rates from the frame:
|
||||
|
||||
```python
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the service."""
|
||||
await super().start(frame)
|
||||
self._settings["output_format"]["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
```
|
||||
|
||||
Note that `self.sample_rate` is a `@property` set in the TTSService base class, which provides access to the private sample rate value obtained from the StartFrame.
|
||||
|
||||
### Tracing Decorators
|
||||
|
||||
Use Pipecat's tracing decorators:
|
||||
|
||||
- **STT:** `@traced_stt` - decorate a function that handles `transcript`, `is_final`, `language` as args
|
||||
- **LLM:** `@traced_llm` - decorate the `_process_context()` method
|
||||
- **TTS:** `@traced_tts` - decorate the `run_tts()` method
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Packaging and Distribution
|
||||
|
||||
- Use [uv](https://docs.astral.sh/uv/) for packaging (encouraged)
|
||||
- Consider releasing to PyPI for easier installation
|
||||
- Follow semantic versioning principles
|
||||
- Maintain a changelog
|
||||
|
||||
### HTTP Communication
|
||||
|
||||
For REST-based communication, use aiohttp. Pipecat includes this as a required dependency, so using it prevents adding an additional dependency to your integration.
|
||||
|
||||
### Error Handling
|
||||
|
||||
- Wrap API calls in appropriate try/catch blocks
|
||||
- Handle rate limits and network failures gracefully
|
||||
- Provide meaningful error messages
|
||||
- When errors occur, raise exceptions AND push `ErrorFrame`s to notify the pipeline:
|
||||
|
||||
```python
|
||||
from pipecat.frames.frames import ErrorFrame
|
||||
|
||||
try:
|
||||
# Your API call
|
||||
result = await self._make_api_call()
|
||||
except Exception as e:
|
||||
# Push error frame to pipeline
|
||||
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
|
||||
# Raise or handle as appropriate
|
||||
raise
|
||||
```
|
||||
|
||||
### Testing
|
||||
|
||||
- Your foundational example serves as a valuable integration-level test
|
||||
- Unit tests are nice to have. As the Pipecat teams provides better guidance, we will encourage unit testing more
|
||||
|
||||
## Disclaimer
|
||||
|
||||
Community integrations are community-maintained and not officially supported by the Pipecat team. Users should evaluate these integrations independently. The Pipecat team reserves the right to remove listings that become unmaintained or problematic.
|
||||
|
||||
## Staying Up to Date
|
||||
|
||||
Pipecat evolves rapidly to support the latest AI technologies and patterns. While we strive to minimize breaking changes, they do occur as the framework matures.
|
||||
|
||||
**We strongly recommend:**
|
||||
|
||||
- Join our Discord at https://discord.gg/pipecat and monitor the `#announcements` channel for release notifications
|
||||
- Follow our changelog: https://github.com/pipecat-ai/pipecat/blob/main/CHANGELOG.md
|
||||
- Test your integration against new Pipecat releases promptly
|
||||
- Update your README with the last tested Pipecat version
|
||||
|
||||
This helps ensure your integration remains compatible and your users have clear expectations about version support.
|
||||
|
||||
## Questions?
|
||||
|
||||
Join our Discord community at https://discord.gg/pipecat and post in the `#community-integrations` channel for guidance and support.
|
||||
|
||||
For additional questions, you can also reach out to us at pipecat-ai@daily.co.
|
||||
@@ -1,5 +1,9 @@
|
||||
## Contributing to Pipecat
|
||||
|
||||
**Want to add a new service integration?**
|
||||
We encourage community-maintained integrations! Please see our [Community Integration Guide](COMMUNITY_INTEGRATIONS.md) for the process and requirements.
|
||||
|
||||
**Want to contribute to Pipecat core?**
|
||||
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
|
||||
|
||||
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
|
||||
|
||||
108
README.md
108
README.md
@@ -19,10 +19,6 @@
|
||||
- **Business Agents** – customer intake, support bots, guided flows
|
||||
- **Complex Dialog Systems** – design logic with structured conversations
|
||||
|
||||
🧭 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
|
||||
|
||||
🔍 Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
|
||||
|
||||
## 🧠 Why Pipecat?
|
||||
|
||||
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling
|
||||
@@ -30,40 +26,30 @@
|
||||
- **Composable Pipelines**: Build complex behavior from modular components
|
||||
- **Real-Time**: Ultra-low latency interaction with different transports (e.g. WebSockets or WebRTC)
|
||||
|
||||
## 📱 Client SDKs
|
||||
## 🌐 Pipecat Ecosystem
|
||||
|
||||
You can connect to Pipecat from any platform using our official SDKs:
|
||||
### 📱 Client SDKs
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/javascript/javascript-original.svg" width="40" height="40" alt="JavaScript"/>
|
||||
<a href="https://docs.pipecat.ai/client/js/introduction">JavaScript</a>
|
||||
</td>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/react/react-original.svg" width="40" height="40" alt="React"/>
|
||||
<a href="https://docs.pipecat.ai/client/react/introduction">React</a>
|
||||
</td>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/react/react-original.svg" width="40" height="40" alt="React Native"/>
|
||||
<a href="https://docs.pipecat.ai/client/react-native/introduction">React Native</a>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/swift/swift-original.svg" width="40" height="40" alt="Swift"/>
|
||||
<a href="https://docs.pipecat.ai/client/ios/introduction">Swift</a>
|
||||
</td>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/kotlin/kotlin-original.svg" width="40" height="40" alt="Kotlin"/>
|
||||
<a href="https://docs.pipecat.ai/client/android/introduction">Kotlin</a>
|
||||
</td>
|
||||
<td>
|
||||
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/cplusplus/cplusplus-original.svg" width="40" height="40" alt="JavaScript"/>
|
||||
<a href="https://docs.pipecat.ai/client/c++/introduction">C++</a>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
Building client applications? You can connect to Pipecat from any platform using our official SDKs:
|
||||
|
||||
<a href="https://docs.pipecat.ai/client/js/introduction">JavaScript</a> | <a href="https://docs.pipecat.ai/client/react/introduction">React</a> | <a href="https://docs.pipecat.ai/client/react-native/introduction">React Native</a> |
|
||||
<a href="https://docs.pipecat.ai/client/ios/introduction">Swift</a> | <a href="https://docs.pipecat.ai/client/android/introduction">Kotlin</a> | <a href="https://docs.pipecat.ai/client/c++/introduction">C++</a> | <a href="https://github.com/pipecat-ai/pipecat-esp32">ESP32</a>
|
||||
|
||||
### 🧭 Structured conversations
|
||||
|
||||
Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
|
||||
|
||||
### 🪄 Beautiful UIs
|
||||
|
||||
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
|
||||
|
||||
### 🔍 Debugging
|
||||
|
||||
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
|
||||
|
||||
### 🖥️ Terminal
|
||||
|
||||
Love terminal applications? Check out [Tail](https://github.com/pipecat-ai/tail), a terminal dashboard for Pipecat.
|
||||
|
||||
## 🎬 See it in action
|
||||
|
||||
@@ -81,7 +67,7 @@ You can connect to Pipecat from any platform using our official SDKs:
|
||||
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
|
||||
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
|
||||
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
|
||||
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
|
||||
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
|
||||
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
|
||||
@@ -184,54 +170,6 @@ Run a specific test suite:
|
||||
uv run pytest tests/test_name.py
|
||||
```
|
||||
|
||||
### Setting up your editor
|
||||
|
||||
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
|
||||
|
||||
#### Emacs
|
||||
|
||||
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
|
||||
|
||||
```elisp
|
||||
(use-package lazy-ruff
|
||||
:ensure t
|
||||
:hook ((python-mode . lazy-ruff-mode))
|
||||
:config
|
||||
(setq lazy-ruff-format-command "ruff format")
|
||||
(setq lazy-ruff-check-command "ruff check --select I"))
|
||||
```
|
||||
|
||||
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
|
||||
|
||||
```elisp
|
||||
(use-package pyvenv-auto
|
||||
:ensure t
|
||||
:defer t
|
||||
:hook ((python-mode . pyvenv-auto-run)))
|
||||
```
|
||||
|
||||
#### Visual Studio Code
|
||||
|
||||
Install the
|
||||
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
|
||||
|
||||
```json
|
||||
"[python]": {
|
||||
"editor.defaultFormatter": "charliermarsh.ruff",
|
||||
"editor.formatOnSave": true
|
||||
}
|
||||
```
|
||||
|
||||
#### PyCharm
|
||||
|
||||
`ruff` was installed in the `venv` environment described before, now to enable autoformatting on save, go to `File` -> `Settings` -> `Tools` -> `File Watchers` and add a new watcher with the following settings:
|
||||
|
||||
1. **Name**: `Ruff formatter`
|
||||
2. **File type**: `Python`
|
||||
3. **Working directory**: `$ContentRoot$`
|
||||
4. **Arguments**: `format $FilePath$`
|
||||
5. **Program**: `$PyInterpreterDirectory$/ruff`
|
||||
|
||||
## 🤝 Contributing
|
||||
|
||||
We welcome contributions from the community! Whether you're fixing bugs, improving documentation, or adding new features, here's how you can help:
|
||||
|
||||
5
SECURITY.md
Normal file
5
SECURITY.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Security Policy
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
Please email `disclosures@daily.co`.
|
||||
@@ -58,6 +58,9 @@ GOOGLE_CLOUD_PROJECT_ID=...
|
||||
GOOGLE_TEST_CREDENTIALS=...
|
||||
GOOGLE_VERTEX_TEST_CREDENTIALS=...
|
||||
|
||||
# Hume
|
||||
HUME_API_KEY=...
|
||||
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
@@ -155,3 +158,9 @@ NVIDIA_API_KEY=...
|
||||
|
||||
# Qwen
|
||||
QWEN_API_KEY=...
|
||||
|
||||
# WhatsApp
|
||||
WHATSAPP_TOKEN=
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=
|
||||
WHATSAPP_PHONE_NUMBER_ID=
|
||||
WHATSAPP_APP_SECRET=
|
||||
@@ -25,7 +25,7 @@ from pipecat.processors.aggregators.llm_response_universal import LLMContextAggr
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.daily.transport import DailyLogLevel, DailyParams, DailyTransport
|
||||
from pipecat.transports.daily.transport import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -49,7 +49,6 @@ async def main():
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
)
|
||||
transport.set_log_level(DailyLogLevel.Info)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
|
||||
138
examples/foundational/07ae-interruptible-hume.py
Normal file
138
examples/foundational/07ae-interruptible-hume.py
Normal file
@@ -0,0 +1,138 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.hume.tts import HUME_SAMPLE_RATE, HumeTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = HumeTTSService(
|
||||
api_key=os.getenv("HUME_API_KEY"),
|
||||
# Replace with your Hume voice ID
|
||||
voice_id="f898a92e-685f-43fa-985b-a46920f0650b",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
audio_out_sample_rate=HUME_SAMPLE_RATE,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
await rtvi.set_bot_ready()
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
118
examples/foundational/07c-interruptible-deepgram-flux.py
Normal file
118
examples/foundational/07c-interruptible-deepgram-flux.py
Normal file
@@ -0,0 +1,118 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContext,
|
||||
LLMContextAggregatorPair,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService
|
||||
from pipecat.services.deepgram.tts import DeepgramTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -23,7 +23,6 @@ from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.elevenlabs.stt import ElevenLabsSTTService
|
||||
from pipecat.services.elevenlabs.tts import ElevenLabsHttpTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
|
||||
151
examples/foundational/07n-interruptible-gemini-image.py
Normal file
151
examples/foundational/07n-interruptible-gemini-image.py
Normal file
@@ -0,0 +1,151 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""
|
||||
A conversational AI bot using Gemini for both LLM, STT and TTS.
|
||||
|
||||
This example demonstrates how to use Gemini's image generation capabilities.
|
||||
|
||||
Features showcased:
|
||||
- Gemini LLM for conversation and image generation
|
||||
- Google TTS and STT
|
||||
|
||||
Run with:
|
||||
python examples/foundational/07n-interruptible-gemini-image.py
|
||||
|
||||
Make sure to set your environment variables:
|
||||
export GOOGLE_API_KEY=your_api_key_here
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.google.stt import GoogleSTTService
|
||||
from pipecat.services.google.tts import GoogleTTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
# instantiated. The function will be called when the desired transport gets
|
||||
# selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = GoogleSTTService(
|
||||
params=GoogleSTTService.InputParams(languages=Language.EN_US),
|
||||
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
|
||||
)
|
||||
|
||||
tts = GoogleTTSService(
|
||||
voice_id="en-US-Chirp3-HD-Charon",
|
||||
params=GoogleTTSService.InputParams(language=Language.EN_US),
|
||||
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
model="gemini-2.5-flash-image",
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # Gemini TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation with a styled introduction
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -26,7 +26,11 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.deepgram.tts import DeepgramTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams, DailyTransportMessageFrame
|
||||
from pipecat.transports.daily.transport import (
|
||||
DailyOutputTransportMessageFrame,
|
||||
DailyOutputTransportMessageUrgentFrame,
|
||||
DailyParams,
|
||||
)
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
@@ -128,14 +132,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.debug(f"Received latency ping app message: {message}")
|
||||
ts = message["latency-ping"]["ts"]
|
||||
# Send immediately
|
||||
transport.output().send_message(
|
||||
DailyTransportMessageFrame(
|
||||
await task.queue_frame(
|
||||
DailyOutputTransportMessageUrgentFrame(
|
||||
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
|
||||
)
|
||||
)
|
||||
# And push to the pipeline for the Daily transport.output to send
|
||||
await task.queue_frame(
|
||||
DailyTransportMessageFrame(
|
||||
DailyOutputTransportMessageFrame(
|
||||
message={"latency-pong-pipeline-delivery": {"ts": ts}},
|
||||
participant_id=sender,
|
||||
)
|
||||
|
||||
@@ -29,10 +29,6 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
|
||||
logger.info("Loading Whisker debugger...")
|
||||
from pipecat_whisker import WhiskerObserver
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -56,8 +52,6 @@ class TranscriptHandler:
|
||||
"""
|
||||
self.messages: List[TranscriptionMessage] = []
|
||||
self.output_file: Optional[str] = output_file
|
||||
self._current_user_sentence = ""
|
||||
self._current_assistant_sentence = ""
|
||||
logger.debug(
|
||||
f"TranscriptHandler initialized {'with output_file=' + output_file if output_file else 'with log output only'}"
|
||||
)
|
||||
@@ -84,29 +78,11 @@ class TranscriptHandler:
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving transcript message to file: {e}")
|
||||
|
||||
async def _save_sentence(self, role: str, content: str, timestamp: Optional[str] = None):
|
||||
"""Save a complete sentence as a transcript message.
|
||||
|
||||
Args:
|
||||
role: The role (user/assistant)
|
||||
content: The complete sentence content
|
||||
timestamp: Optional timestamp
|
||||
"""
|
||||
# Cast role to the appropriate literal type
|
||||
message_role = "user" if role == "user" else "assistant"
|
||||
sentence_message = TranscriptionMessage(
|
||||
role=message_role, content=content.strip(), timestamp=timestamp
|
||||
)
|
||||
self.messages.append(sentence_message)
|
||||
await self.save_message(sentence_message)
|
||||
|
||||
async def on_transcript_update(
|
||||
self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame
|
||||
):
|
||||
"""Handle new transcript messages.
|
||||
|
||||
Aggregates messages into complete sentences before saving them using match_endofsentence.
|
||||
|
||||
Args:
|
||||
processor: The TranscriptProcessor that emitted the update
|
||||
frame: TranscriptionUpdateFrame containing new messages
|
||||
@@ -114,31 +90,8 @@ class TranscriptHandler:
|
||||
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
|
||||
|
||||
for msg in frame.messages:
|
||||
# Accumulate text for the appropriate role
|
||||
if msg.role == "user":
|
||||
self._current_user_sentence += msg.content + " "
|
||||
# Check if we have a complete sentence
|
||||
if match_endofsentence(self._current_user_sentence):
|
||||
await self._save_sentence("user", self._current_user_sentence, msg.timestamp)
|
||||
self._current_user_sentence = ""
|
||||
elif msg.role == "assistant":
|
||||
self._current_assistant_sentence += msg.content + " "
|
||||
# Check if we have a complete sentence
|
||||
if match_endofsentence(self._current_assistant_sentence):
|
||||
await self._save_sentence(
|
||||
"assistant", self._current_assistant_sentence, msg.timestamp
|
||||
)
|
||||
self._current_assistant_sentence = ""
|
||||
|
||||
async def finalize_partial_sentences(self):
|
||||
"""Save any remaining partial sentences when the conversation ends."""
|
||||
if self._current_user_sentence.strip():
|
||||
await self._save_sentence("user", self._current_user_sentence)
|
||||
self._current_user_sentence = ""
|
||||
|
||||
if self._current_assistant_sentence.strip():
|
||||
await self._save_sentence("assistant", self._current_assistant_sentence)
|
||||
self._current_assistant_sentence = ""
|
||||
self.messages.append(msg)
|
||||
await self.save_message(msg)
|
||||
|
||||
|
||||
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
|
||||
@@ -207,16 +160,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
]
|
||||
)
|
||||
|
||||
# Create Whisker debugger observer
|
||||
whisker = WhiskerObserver(pipeline)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[whisker],
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@@ -234,8 +183,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
# Finalize any partial sentences before canceling
|
||||
await transcript_handler.finalize_partial_sentences()
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
@@ -206,6 +206,14 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("NASA_API_KEY"):
|
||||
logger.error(
|
||||
f"Please set NASA_API_KEY environment variable for this example. See https://api.nasa.gov"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -141,6 +141,14 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("MCP_RUN_SSE_URL"):
|
||||
logger.error(
|
||||
f"Please set MCP_RUN_SSE_URL environment variable for this example. See https://mcp.run"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -219,6 +219,14 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("NASA_API_KEY") or not os.getenv("MCP_RUN_SSE_URL"):
|
||||
logger.error(
|
||||
f"Please set NASA_API_KEY and MCP_RUN_SSE_URL environment variables. See https://api.nasa.gov and https://mcp.run"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -145,6 +145,14 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
|
||||
logger.error(
|
||||
f"Please set GITHUB_PERSONAL_ACCESS_TOKEN environment variable for this example."
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
description = "Quickstart example for building voice AI bots with Pipecat"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.85",
|
||||
"pipecat-ai[webrtc,daily,silero,deepgram,openai,cartesia,local-smart-turn-v3,runner]>=0.0.86",
|
||||
"pipecatcloud>=0.2.4"
|
||||
]
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ anthropic = [ "anthropic~=0.49.0" ]
|
||||
assemblyai = [ "pipecat-ai[websockets-base]" ]
|
||||
asyncai = [ "pipecat-ai[websockets-base]" ]
|
||||
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
|
||||
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2; python_version>='3.12'" ]
|
||||
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.0; python_version>='3.12'" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
|
||||
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
|
||||
cerebras = []
|
||||
@@ -62,11 +62,12 @@ fal = [ "fal-client~=0.5.9" ]
|
||||
fireworks = []
|
||||
fish = [ "ormsgpack~=1.7.0", "pipecat-ai[websockets-base]" ]
|
||||
gladia = [ "pipecat-ai[websockets-base]" ]
|
||||
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "pipecat-ai[websockets-base]" ]
|
||||
google = [ "google-cloud-speech>=2.33.0,<3", "google-cloud-texttospeech>=2.31.0,<3", "google-genai>=1.41.0,<2", "pipecat-ai[websockets-base]" ]
|
||||
grok = []
|
||||
groq = [ "groq~=0.23.0" ]
|
||||
gstreamer = [ "pygobject~=3.50.0" ]
|
||||
heygen = [ "livekit>=1.0.13", "pipecat-ai[websockets-base]" ]
|
||||
hume = [ "hume>=0.11.2" ]
|
||||
inworld = []
|
||||
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
|
||||
koala = [ "pvkoala~=2.0.3" ]
|
||||
|
||||
@@ -34,7 +34,8 @@ from pipecat.frames.frames import EndTaskFrame, LLMRunFrame, OutputImageRawFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -283,8 +284,8 @@ async def run_eval_pipeline(
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
audio_buffer = AudioBufferProcessor()
|
||||
|
||||
|
||||
@@ -67,6 +67,7 @@ TESTS_07 = [
|
||||
("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07c-interruptible-deepgram-flux.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
(
|
||||
"07d-interruptible-elevenlabs-http.py",
|
||||
@@ -74,8 +75,6 @@ TESTS_07 = [
|
||||
EVAL_SIMPLE_MATH,
|
||||
BOT_SPEAKS_FIRST,
|
||||
),
|
||||
("07e-interruptible-playht.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07e-interruptible-playht-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
@@ -102,6 +101,7 @@ TESTS_07 = [
|
||||
("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
("07ae-interruptible-hume.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
# Needs a local XTTS docker instance running.
|
||||
# ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
|
||||
# Needs a Krisp license.
|
||||
|
||||
@@ -14,6 +14,8 @@ from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
|
||||
|
||||
@@ -29,6 +31,12 @@ class EndOfTurnState(Enum):
|
||||
INCOMPLETE = 2
|
||||
|
||||
|
||||
class BaseTurnParams(BaseModel):
|
||||
"""Base class for turn analyzer parameters."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BaseTurnAnalyzer(ABC):
|
||||
"""Abstract base class for analyzing user end of turn.
|
||||
|
||||
@@ -78,7 +86,7 @@ class BaseTurnAnalyzer(ABC):
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def params(self):
|
||||
def params(self) -> BaseTurnParams:
|
||||
"""Get the current turn analyzer parameters.
|
||||
|
||||
Returns:
|
||||
|
||||
@@ -11,15 +11,17 @@ machine learning models to determine when a user has finished speaking, going
|
||||
beyond simple silence-based detection.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, BaseTurnParams, EndOfTurnState
|
||||
from pipecat.metrics.metrics import MetricsData, SmartTurnMetricsData
|
||||
|
||||
# Default timing parameters
|
||||
@@ -29,7 +31,7 @@ MAX_DURATION_SECONDS = 8 # Max allowed segment duration
|
||||
USE_ONLY_LAST_VAD_SEGMENT = True
|
||||
|
||||
|
||||
class SmartTurnParams(BaseModel):
|
||||
class SmartTurnParams(BaseTurnParams):
|
||||
"""Configuration parameters for smart turn analysis.
|
||||
|
||||
Parameters:
|
||||
@@ -77,6 +79,9 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_triggered = False
|
||||
self._silence_ms = 0
|
||||
self._speech_start_time = 0
|
||||
# Thread executor that will run the model. We only need one thread per
|
||||
# analyzer because one analyzer just handles one audio stream.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
@property
|
||||
def speech_triggered(self) -> bool:
|
||||
@@ -151,7 +156,10 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
Tuple containing the end-of-turn state and optional metrics data
|
||||
from the ML model analysis.
|
||||
"""
|
||||
state, result = await self._process_speech_segment(self._audio_buffer)
|
||||
loop = asyncio.get_running_loop()
|
||||
state, result = await loop.run_in_executor(
|
||||
self._executor, self._process_speech_segment, self._audio_buffer
|
||||
)
|
||||
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
|
||||
self._clear(state)
|
||||
logger.debug(f"End of Turn result: {state}")
|
||||
@@ -169,9 +177,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
self._speech_start_time = 0
|
||||
self._silence_ms = 0
|
||||
|
||||
async def _process_speech_segment(
|
||||
self, audio_buffer
|
||||
) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
def _process_speech_segment(self, audio_buffer) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
|
||||
"""Process accumulated audio segment using ML model."""
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
@@ -203,7 +209,7 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
if len(segment_audio) > 0:
|
||||
start_time = time.perf_counter()
|
||||
try:
|
||||
result = await self._predict_endpoint(segment_audio)
|
||||
result = self._predict_endpoint(segment_audio)
|
||||
state = (
|
||||
EndOfTurnState.COMPLETE
|
||||
if result["prediction"] == 1
|
||||
@@ -249,6 +255,6 @@ class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
return state, result_data
|
||||
|
||||
@abstractmethod
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using ML model from audio data."""
|
||||
pass
|
||||
|
||||
@@ -104,11 +104,15 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
|
||||
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
|
||||
raise Exception("Failed to send raw request to Daily Smart Turn.")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using remote HTTP ML service."""
|
||||
try:
|
||||
serialized_array = self._serialize_array(audio_array)
|
||||
return await self._send_raw_request(serialized_array)
|
||||
loop = asyncio.get_running_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._send_raw_request(serialized_array), loop
|
||||
)
|
||||
return future.result()
|
||||
except Exception as e:
|
||||
logger.error(f"Smart turn prediction failed: {str(e)}")
|
||||
# Return an incomplete prediction when a failure occurs
|
||||
|
||||
@@ -64,7 +64,7 @@ class LocalSmartTurnAnalyzer(BaseSmartTurn):
|
||||
self._turn_model.eval()
|
||||
logger.debug("Loaded Local Smart Turn")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local PyTorch model."""
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
|
||||
@@ -73,7 +73,7 @@ class LocalSmartTurnAnalyzerV2(BaseSmartTurn):
|
||||
self._turn_model.eval()
|
||||
logger.debug("Loaded Local Smart Turn v2")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local PyTorch model."""
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
|
||||
@@ -77,7 +77,7 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
|
||||
|
||||
logger.debug("Loaded Local Smart Turn v3")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
"""Predict end-of-turn using local ONNX model."""
|
||||
|
||||
def truncate_audio_to_last_n_seconds(audio_array, n_seconds=8, sample_rate=16000):
|
||||
|
||||
@@ -11,7 +11,9 @@ data structures for voice activity detection in audio streams. Includes state
|
||||
management, parameter configuration, and audio analysis framework.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
@@ -84,6 +86,10 @@ class VADAnalyzer(ABC):
|
||||
self._smoothing_factor = 0.2
|
||||
self._prev_volume = 0
|
||||
|
||||
# Thread executor that will run the model. We only need one thread per
|
||||
# analyzer because one analyzer just handles one audio stream.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""Get the current sample rate.
|
||||
@@ -165,7 +171,7 @@ class VADAnalyzer(ABC):
|
||||
volume = calculate_audio_volume(audio, self.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
def analyze_audio(self, buffer) -> VADState:
|
||||
async def analyze_audio(self, buffer: bytes) -> VADState:
|
||||
"""Analyze audio buffer and return current VAD state.
|
||||
|
||||
Processes incoming audio data, maintains internal state, and determines
|
||||
@@ -177,6 +183,12 @@ class VADAnalyzer(ABC):
|
||||
Returns:
|
||||
Current VAD state after processing the buffer.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
state = await loop.run_in_executor(self._executor, self._run_analyzer, buffer)
|
||||
return state
|
||||
|
||||
def _run_analyzer(self, buffer: bytes) -> VADState:
|
||||
"""Analyze audio buffer and return current VAD state."""
|
||||
self._vad_buffer += buffer
|
||||
|
||||
num_required_bytes = self._vad_frames_num_bytes
|
||||
|
||||
@@ -672,7 +672,7 @@ class TTSSpeakFrame(DataFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageFrame(DataFrame):
|
||||
class OutputTransportMessageFrame(DataFrame):
|
||||
"""Frame containing transport-specific message data.
|
||||
|
||||
Parameters:
|
||||
@@ -685,6 +685,32 @@ class TransportMessageFrame(DataFrame):
|
||||
return f"{self.name}(message: {self.message})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageFrame(OutputTransportMessageFrame):
|
||||
"""Frame containing transport-specific message data.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `OutputTransportMessageFrame`.
|
||||
|
||||
Parameters:
|
||||
message: The transport message payload.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"TransportMessageFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use OutputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DTMFFrame:
|
||||
"""Base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
|
||||
@@ -1092,8 +1118,8 @@ class STTMuteFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageUrgentFrame(SystemFrame):
|
||||
"""Frame for urgent transport messages that need immediate processing.
|
||||
class InputTransportMessageFrame(SystemFrame):
|
||||
"""Frame for transport messages received from external sources.
|
||||
|
||||
Parameters:
|
||||
message: The urgent transport message payload.
|
||||
@@ -1106,20 +1132,69 @@ class TransportMessageUrgentFrame(SystemFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
class InputTransportMessageUrgentFrame(InputTransportMessageFrame):
|
||||
"""Frame for transport messages received from external sources.
|
||||
|
||||
This frame wraps incoming transport messages to distinguish them from outgoing
|
||||
urgent transport messages (TransportMessageUrgentFrame), preventing infinite
|
||||
message loops in the transport layer. It inherits the message payload from
|
||||
TransportMessageFrame while marking the message as having been received
|
||||
rather than generated locally.
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `InputTransportMessageFrame`.
|
||||
|
||||
Used by transport implementations to properly handle bidirectional message
|
||||
flow without creating feedback loops.
|
||||
Parameters:
|
||||
message: The urgent transport message payload.
|
||||
"""
|
||||
|
||||
pass
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"InputTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use InputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputTransportMessageUrgentFrame(SystemFrame):
|
||||
"""Frame for urgent transport messages that need to be sent immediately.
|
||||
|
||||
Parameters:
|
||||
message: The urgent transport message payload.
|
||||
"""
|
||||
|
||||
message: Any
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(message: {self.message})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
|
||||
"""Frame for urgent transport messages that need to be sent immediately.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `OutputTransportMessageUrgentFrame`.
|
||||
|
||||
Parameters:
|
||||
message: The urgent transport message payload.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"TransportMessageUrgentFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use OutputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -13,8 +13,7 @@ including heartbeats, idle detection, and observer integration.
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from collections import deque
|
||||
from typing import Any, AsyncIterable, Deque, Dict, Iterable, List, Optional, Tuple, Type
|
||||
from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Tuple, Type
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
@@ -31,7 +30,6 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
HeartbeatFrame,
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
InterruptionTaskFrame,
|
||||
MetricsFrame,
|
||||
@@ -132,9 +130,11 @@ class PipelineTask(BasePipelineTask):
|
||||
|
||||
- on_pipeline_finished: Called after the pipeline has reached any terminal state.
|
||||
This includes:
|
||||
|
||||
- StopFrame: pipeline was stopped (processors keep connections open)
|
||||
- EndFrame: pipeline ended normally
|
||||
- CancelFrame: pipeline was cancelled
|
||||
|
||||
Use this event for cleanup, logging, or post-processing tasks. Users can inspect
|
||||
the frame if they need to handle specific cases.
|
||||
|
||||
@@ -395,7 +395,8 @@ class PipelineTask(BasePipelineTask):
|
||||
Cancels all running tasks and stops frame processing without
|
||||
waiting for completion.
|
||||
"""
|
||||
await self._cancel()
|
||||
if not self._finished:
|
||||
await self._cancel()
|
||||
|
||||
async def run(self, params: PipelineTaskParams):
|
||||
"""Start and manage the pipeline execution until completion or cancellation.
|
||||
|
||||
@@ -13,6 +13,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from abc import abstractmethod
|
||||
from typing import Any, Dict, List, Literal, Optional, Set
|
||||
|
||||
from loguru import logger
|
||||
@@ -169,6 +170,11 @@ class LLMContextAggregator(FrameProcessor):
|
||||
"""Reset the aggregation state."""
|
||||
self._aggregation = ""
|
||||
|
||||
@abstractmethod
|
||||
async def push_aggregation(self):
|
||||
"""Push the current aggregation downstream."""
|
||||
pass
|
||||
|
||||
|
||||
class LLMUserAggregator(LLMContextAggregator):
|
||||
"""User LLM aggregator that processes speech-to-text transcriptions.
|
||||
@@ -301,7 +307,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
frame = LLMContextFrame(self._context)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _push_aggregation(self):
|
||||
async def push_aggregation(self):
|
||||
"""Push the current aggregation based on interruption strategies and conditions."""
|
||||
if len(self._aggregation) > 0:
|
||||
if self.interruption_strategies and self._bot_speaking:
|
||||
@@ -392,7 +398,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
# pushing the aggregation as we will probably get a final transcription.
|
||||
if len(self._aggregation) > 0:
|
||||
if not self._seen_interim_results:
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
# Handles the case where both the user and the bot are not speaking,
|
||||
# and the bot was previously speaking before the user interruption.
|
||||
# So in this case we are resetting the aggregation timer
|
||||
@@ -471,7 +477,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await self._maybe_emulate_user_speaking()
|
||||
except asyncio.TimeoutError:
|
||||
if not self._user_speaking:
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
|
||||
# If we are emulating VAD we still need to send the user stopped
|
||||
# speaking frame.
|
||||
@@ -607,12 +613,12 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
elif isinstance(frame, UserImageRawFrame) and frame.request and frame.request.tool_call_id:
|
||||
await self._handle_user_image_frame(frame)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _push_aggregation(self):
|
||||
async def push_aggregation(self):
|
||||
"""Push the current assistant aggregation with timestamp."""
|
||||
if not self._aggregation:
|
||||
return
|
||||
@@ -644,7 +650,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_interruptions(self, frame: InterruptionFrame):
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
self._started = 0
|
||||
await self.reset()
|
||||
|
||||
@@ -778,7 +784,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
text=frame.request.context,
|
||||
)
|
||||
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
await self.push_context_frame(FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
|
||||
@@ -786,7 +792,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
|
||||
self._started -= 1
|
||||
await self._push_aggregation()
|
||||
await self.push_aggregation()
|
||||
|
||||
async def _handle_text(self, frame: TextFrame):
|
||||
if not self._started:
|
||||
|
||||
@@ -12,14 +12,14 @@ in conversational pipelines.
|
||||
"""
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMUserAggregator
|
||||
|
||||
|
||||
class UserResponseAggregator(LLMUserContextAggregator):
|
||||
class UserResponseAggregator(LLMUserAggregator):
|
||||
"""Aggregates user responses into TextFrame objects.
|
||||
|
||||
This aggregator extends LLMUserContextAggregator to specifically handle
|
||||
This aggregator extends LLMUserAggregator to specifically handle
|
||||
user input by collecting text responses and outputting them as TextFrame
|
||||
objects when the aggregation is complete.
|
||||
"""
|
||||
@@ -28,9 +28,9 @@ class UserResponseAggregator(LLMUserContextAggregator):
|
||||
"""Initialize the user response aggregator.
|
||||
|
||||
Args:
|
||||
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
|
||||
**kwargs: Additional arguments passed to parent LLMUserAggregator.
|
||||
"""
|
||||
super().__init__(context=OpenAILLMContext(), **kwargs)
|
||||
super().__init__(context=LLMContext(), **kwargs)
|
||||
|
||||
async def push_aggregation(self):
|
||||
"""Push the aggregated user response as a TextFrame.
|
||||
|
||||
@@ -13,6 +13,7 @@ and frame observation for the RTVI protocol.
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -29,6 +30,7 @@ from typing import (
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field, PrivateAttr, ValidationError
|
||||
|
||||
from pipecat.audio.utils import calculate_audio_volume
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
@@ -40,6 +42,7 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
InputTransportMessageFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMConfigureOutputFrame,
|
||||
LLMContextFrame,
|
||||
@@ -48,10 +51,11 @@ from pipecat.frames.frames import (
|
||||
LLMMessagesAppendFrame,
|
||||
LLMTextFrame,
|
||||
MetricsFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
TranscriptionFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSTextFrame,
|
||||
@@ -613,9 +617,9 @@ class RTVIAppendToContextData(BaseModel):
|
||||
|
||||
Contains the role, content, and whether to run the message immediately.
|
||||
|
||||
.. deprecated:: 0.0.85
|
||||
The RTVI message, append-to-context, has been deprecated. Use send-text
|
||||
or custom client and server messages instead.
|
||||
.. deprecated:: 0.0.85
|
||||
The RTVI message, append-to-context, has been deprecated. Use send-text
|
||||
or custom client and server messages instead.
|
||||
"""
|
||||
|
||||
role: Literal["user", "assistant"] | str
|
||||
@@ -839,6 +843,36 @@ class RTVIServerMessage(BaseModel):
|
||||
data: Any
|
||||
|
||||
|
||||
class RTVIAudioLevelMessageData(BaseModel):
|
||||
"""Data format for sending audio levels."""
|
||||
|
||||
value: float
|
||||
|
||||
|
||||
class RTVIUserAudioLevelMessage(BaseModel):
|
||||
"""Message indicating user audio level."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["user-audio-level"] = "user-audio-level"
|
||||
data: RTVIAudioLevelMessageData
|
||||
|
||||
|
||||
class RTVIBotAudioLevelMessage(BaseModel):
|
||||
"""Message indicating bot audio level."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["bot-audio-level"] = "bot-audio-level"
|
||||
data: RTVIAudioLevelMessageData
|
||||
|
||||
|
||||
class RTVISystemLogMessage(BaseModel):
|
||||
"""Message including a system log."""
|
||||
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["system-log"] = "system-log"
|
||||
data: RTVITextMessageData
|
||||
|
||||
|
||||
@dataclass
|
||||
class RTVIServerMessageFrame(SystemFrame):
|
||||
"""A frame for sending server messages to the client.
|
||||
@@ -858,25 +892,36 @@ class RTVIServerMessageFrame(SystemFrame):
|
||||
class RTVIObserverParams:
|
||||
"""Parameters for configuring RTVI Observer behavior.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
|
||||
|
||||
Parameters:
|
||||
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
|
||||
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
|
||||
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
|
||||
bot_audio_level_enabled: Indicates if bot's audio level messages should be sent.
|
||||
user_llm_enabled: Indicates if the user's LLM input messages should be sent.
|
||||
user_speaking_enabled: Indicates if the user's started/stopped speaking messages should be sent.
|
||||
user_transcription_enabled: Indicates if user's transcription messages should be sent.
|
||||
user_audio_level_enabled: Indicates if user's audio level messages should be sent.
|
||||
metrics_enabled: Indicates if metrics messages should be sent.
|
||||
errors_enabled: Indicates if errors messages should be sent.
|
||||
system_logs_enabled: Indicates if system logs should be sent.
|
||||
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
|
||||
audio_level_period_secs: How often audio levels should be sent if enabled.
|
||||
"""
|
||||
|
||||
bot_llm_enabled: bool = True
|
||||
bot_tts_enabled: bool = True
|
||||
bot_speaking_enabled: bool = True
|
||||
bot_audio_level_enabled: bool = False
|
||||
user_llm_enabled: bool = True
|
||||
user_speaking_enabled: bool = True
|
||||
user_transcription_enabled: bool = True
|
||||
user_audio_level_enabled: bool = False
|
||||
metrics_enabled: bool = True
|
||||
errors_enabled: bool = True
|
||||
system_logs_enabled: bool = False
|
||||
errors_enabled: Optional[bool] = None
|
||||
audio_level_period_secs: float = 0.15
|
||||
|
||||
|
||||
class RTVIObserver(BaseObserver):
|
||||
@@ -892,7 +937,11 @@ class RTVIObserver(BaseObserver):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, rtvi: "RTVIProcessor", *, params: Optional[RTVIObserverParams] = None, **kwargs
|
||||
self,
|
||||
rtvi: Optional["RTVIProcessor"] = None,
|
||||
*,
|
||||
params: Optional[RTVIObserverParams] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the RTVI observer.
|
||||
|
||||
@@ -904,9 +953,50 @@ class RTVIObserver(BaseObserver):
|
||||
super().__init__(**kwargs)
|
||||
self._rtvi = rtvi
|
||||
self._params = params or RTVIObserverParams()
|
||||
self._bot_transcription = ""
|
||||
|
||||
self._frames_seen = set()
|
||||
rtvi.set_errors_enabled(self._params.errors_enabled)
|
||||
|
||||
self._bot_transcription = ""
|
||||
self._last_user_audio_level = 0
|
||||
self._last_bot_audio_level = 0
|
||||
|
||||
if self._params.system_logs_enabled:
|
||||
self._system_logger_id = logger.add(self._logger_sink)
|
||||
|
||||
if self._params.errors_enabled is not None:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"Parameter `errors_enabled` is deprecated. Error messages are always enabled.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
async def _logger_sink(self, message):
|
||||
"""Logger sink so we cna send system logs to RTVI clients."""
|
||||
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup RTVI observer resources."""
|
||||
await super().cleanup()
|
||||
if self._params.system_logs_enabled:
|
||||
logger.remove(self._system_logger_id)
|
||||
|
||||
async def send_rtvi_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Send an RTVI message.
|
||||
|
||||
By default, we push a transport frame. But this function can be
|
||||
overriden by subclass to send RTVI messages in different ways.
|
||||
|
||||
Args:
|
||||
model: The message to send.
|
||||
exclude_none: Whether to exclude None values from the model dump.
|
||||
|
||||
"""
|
||||
if self._rtvi:
|
||||
await self._rtvi.push_transport_message(model, exclude_none)
|
||||
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Process a frame being pushed through the pipeline.
|
||||
@@ -948,52 +1038,58 @@ class RTVIObserver(BaseObserver):
|
||||
):
|
||||
await self._handle_context(frame)
|
||||
elif isinstance(frame, LLMFullResponseStartFrame) and self._params.bot_llm_enabled:
|
||||
await self.push_transport_message_urgent(RTVIBotLLMStartedMessage())
|
||||
await self.send_rtvi_message(RTVIBotLLMStartedMessage())
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) and self._params.bot_llm_enabled:
|
||||
await self.push_transport_message_urgent(RTVIBotLLMStoppedMessage())
|
||||
await self.send_rtvi_message(RTVIBotLLMStoppedMessage())
|
||||
elif isinstance(frame, LLMTextFrame) and self._params.bot_llm_enabled:
|
||||
await self._handle_llm_text_frame(frame)
|
||||
elif isinstance(frame, TTSStartedFrame) and self._params.bot_tts_enabled:
|
||||
await self.push_transport_message_urgent(RTVIBotTTSStartedMessage())
|
||||
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
|
||||
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
|
||||
await self.push_transport_message_urgent(RTVIBotTTSStoppedMessage())
|
||||
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
|
||||
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
|
||||
if isinstance(src, BaseOutputTransport):
|
||||
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
else:
|
||||
mark_as_seen = False
|
||||
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
|
||||
await self._handle_metrics(frame)
|
||||
elif isinstance(frame, RTVIServerMessageFrame):
|
||||
message = RTVIServerMessage(data=frame.data)
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
elif isinstance(frame, RTVIServerResponseFrame):
|
||||
if frame.error is not None:
|
||||
await self._send_error_response(frame)
|
||||
else:
|
||||
await self._send_server_response(frame)
|
||||
elif isinstance(frame, InputAudioRawFrame) and self._params.user_audio_level_enabled:
|
||||
curr_time = time.time()
|
||||
diff_time = curr_time - self._last_user_audio_level
|
||||
if diff_time > self._params.audio_level_period_secs:
|
||||
level = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
message = RTVIUserAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
|
||||
await self.send_rtvi_message(message)
|
||||
self._last_user_audio_level = curr_time
|
||||
elif isinstance(frame, TTSAudioRawFrame) and self._params.bot_audio_level_enabled:
|
||||
curr_time = time.time()
|
||||
diff_time = curr_time - self._last_bot_audio_level
|
||||
if diff_time > self._params.audio_level_period_secs:
|
||||
level = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
message = RTVIBotAudioLevelMessage(data=RTVIAudioLevelMessageData(value=level))
|
||||
await self.send_rtvi_message(message)
|
||||
self._last_bot_audio_level = curr_time
|
||||
|
||||
if mark_as_seen:
|
||||
self._frames_seen.add(frame.id)
|
||||
|
||||
async def push_transport_message_urgent(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push an urgent transport message to the RTVI processor.
|
||||
|
||||
Args:
|
||||
model: The message model to send.
|
||||
exclude_none: Whether to exclude None values from the model dump.
|
||||
"""
|
||||
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
|
||||
await self._rtvi.push_frame(frame)
|
||||
|
||||
async def _push_bot_transcription(self):
|
||||
"""Push accumulated bot transcription as a message."""
|
||||
if len(self._bot_transcription) > 0:
|
||||
message = RTVIBotTranscriptionMessage(
|
||||
data=RTVITextMessageData(text=self._bot_transcription)
|
||||
)
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
self._bot_transcription = ""
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
@@ -1005,7 +1101,7 @@ class RTVIObserver(BaseObserver):
|
||||
message = RTVIUserStoppedSpeakingMessage()
|
||||
|
||||
if message:
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def _handle_bot_speaking(self, frame: Frame):
|
||||
"""Handle bot speaking event frames."""
|
||||
@@ -1016,12 +1112,12 @@ class RTVIObserver(BaseObserver):
|
||||
message = RTVIBotStoppedSpeakingMessage()
|
||||
|
||||
if message:
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
|
||||
"""Handle LLM text output frames."""
|
||||
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
self._bot_transcription += frame.text
|
||||
if match_endofsentence(self._bot_transcription):
|
||||
@@ -1044,7 +1140,7 @@ class RTVIObserver(BaseObserver):
|
||||
)
|
||||
|
||||
if message:
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def _handle_context(self, frame: OpenAILLMContextFrame | LLMContextFrame):
|
||||
"""Process LLM context frames to extract user messages for the RTVI client."""
|
||||
@@ -1064,7 +1160,7 @@ class RTVIObserver(BaseObserver):
|
||||
text = "".join(part.text for part in message.parts if hasattr(part, "text"))
|
||||
if text:
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self.push_transport_message_urgent(rtvi_message)
|
||||
await self.send_rtvi_message(rtvi_message)
|
||||
|
||||
# Handle OpenAI format (original implementation)
|
||||
elif isinstance(message, dict):
|
||||
@@ -1075,7 +1171,7 @@ class RTVIObserver(BaseObserver):
|
||||
else:
|
||||
text = content
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self.push_transport_message_urgent(rtvi_message)
|
||||
await self.send_rtvi_message(rtvi_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Caught an error while trying to handle context: {e}")
|
||||
@@ -1102,7 +1198,7 @@ class RTVIObserver(BaseObserver):
|
||||
metrics["characters"].append(d.model_dump(exclude_none=True))
|
||||
|
||||
message = RTVIMetricsMessage(data=metrics)
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def _send_server_response(self, frame: RTVIServerResponseFrame):
|
||||
"""Send a response to the client for a specific request."""
|
||||
@@ -1110,15 +1206,14 @@ class RTVIObserver(BaseObserver):
|
||||
id=str(frame.client_msg.msg_id),
|
||||
data=RTVIRawServerResponseData(t=frame.client_msg.type, d=frame.data),
|
||||
)
|
||||
await self.push_transport_message_urgent(message)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
async def _send_error_response(self, frame: RTVIServerResponseFrame):
|
||||
"""Send a response to the client for a specific request."""
|
||||
if self._params.errors_enabled:
|
||||
message = RTVIErrorResponse(
|
||||
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
|
||||
)
|
||||
await self.push_transport_message_urgent(message)
|
||||
message = RTVIErrorResponse(
|
||||
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
|
||||
)
|
||||
await self.send_rtvi_message(message)
|
||||
|
||||
|
||||
class RTVIProcessor(FrameProcessor):
|
||||
@@ -1152,7 +1247,6 @@ class RTVIProcessor(FrameProcessor):
|
||||
# Default to 0.3.0 which is the last version before actually having a
|
||||
# "client-version".
|
||||
self._client_version = [0, 3, 0]
|
||||
self._errors_enabled = True
|
||||
self._skip_tts: bool = False # Keep in sync with llm_service.py
|
||||
|
||||
self._registered_actions: Dict[str, RTVIAction] = {}
|
||||
@@ -1222,14 +1316,6 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self._update_config(self._config, False)
|
||||
await self._send_bot_ready()
|
||||
|
||||
def set_errors_enabled(self, enabled: bool):
|
||||
"""Enable or disable error message sending.
|
||||
|
||||
Args:
|
||||
enabled: Whether to send error messages.
|
||||
"""
|
||||
self._errors_enabled = enabled
|
||||
|
||||
async def interrupt_bot(self):
|
||||
"""Send a bot interruption frame upstream."""
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
@@ -1258,6 +1344,13 @@ class RTVIProcessor(FrameProcessor):
|
||||
"""
|
||||
await self._send_error_frame(ErrorFrame(error=error))
|
||||
|
||||
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push a transport message frame."""
|
||||
frame = OutputTransportMessageUrgentFrame(
|
||||
message=model.model_dump(exclude_none=exclude_none)
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def handle_message(self, message: RTVIMessage):
|
||||
"""Handle an incoming RTVI message.
|
||||
|
||||
@@ -1278,7 +1371,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
args=params.arguments,
|
||||
)
|
||||
message = RTVILLMFunctionCallMessage(data=fn)
|
||||
await self._push_transport_message(message, exclude_none=False)
|
||||
await self.push_transport_message(message, exclude_none=False)
|
||||
|
||||
async def handle_function_call_start(
|
||||
self, function_name: str, llm: FrameProcessor, context: OpenAILLMContext
|
||||
@@ -1305,7 +1398,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
fn = RTVILLMFunctionCallStartMessageData(function_name=function_name)
|
||||
message = RTVILLMFunctionCallStartMessage(data=fn)
|
||||
await self._push_transport_message(message, exclude_none=False)
|
||||
await self.push_transport_message(message, exclude_none=False)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming frames through the RTVI processor.
|
||||
@@ -1328,7 +1421,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
await self._send_error_frame(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TransportMessageUrgentFrame):
|
||||
elif isinstance(frame, InputTransportMessageFrame):
|
||||
await self._handle_transport_message(frame)
|
||||
# All other system frames
|
||||
elif isinstance(frame, SystemFrame):
|
||||
@@ -1377,11 +1470,6 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self.cancel_task(self._message_task)
|
||||
self._message_task = None
|
||||
|
||||
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
"""Push a transport message frame."""
|
||||
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _action_task_handler(self):
|
||||
"""Handle incoming action frames."""
|
||||
while True:
|
||||
@@ -1396,7 +1484,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self._handle_message(message)
|
||||
self._message_queue.task_done()
|
||||
|
||||
async def _handle_transport_message(self, frame: TransportMessageUrgentFrame):
|
||||
async def _handle_transport_message(self, frame: InputTransportMessageFrame):
|
||||
"""Handle an incoming transport message frame."""
|
||||
try:
|
||||
transport_message = frame.message
|
||||
@@ -1518,7 +1606,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
services = list(self._registered_services.values())
|
||||
message = RTVIDescribeConfig(id=request_id, data=RTVIDescribeConfigData(config=services))
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _handle_describe_actions(self, request_id: str):
|
||||
"""Handle a describe-actions request."""
|
||||
@@ -1533,7 +1621,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
actions = list(self._registered_actions.values())
|
||||
message = RTVIDescribeActions(id=request_id, data=RTVIDescribeActionsData(actions=actions))
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _handle_get_config(self, request_id: str):
|
||||
"""Handle a get-config request."""
|
||||
@@ -1547,7 +1635,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
)
|
||||
|
||||
message = RTVIConfigResponse(id=request_id, data=self._config)
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
def _update_config_option(self, service: str, config: RTVIServiceOptionConfig):
|
||||
"""Update a specific configuration option."""
|
||||
@@ -1672,7 +1760,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
# action responses (such as webhooks) don't set a request_id
|
||||
if request_id:
|
||||
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _send_bot_ready(self):
|
||||
"""Send the bot-ready message to the client."""
|
||||
@@ -1683,23 +1771,21 @@ class RTVIProcessor(FrameProcessor):
|
||||
id=self._client_ready_id,
|
||||
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=config),
|
||||
)
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _send_server_message(self, message: RTVIServerMessage | RTVIServerResponse):
|
||||
"""Send a message or response to the client."""
|
||||
await self._push_transport_message(message)
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _send_error_frame(self, frame: ErrorFrame):
|
||||
"""Send an error frame as an RTVI error message."""
|
||||
if self._errors_enabled:
|
||||
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
|
||||
await self._push_transport_message(message)
|
||||
message = RTVIError(data=RTVIErrorData(error=frame.error, fatal=frame.fatal))
|
||||
await self.push_transport_message(message)
|
||||
|
||||
async def _send_error_response(self, id: str, error: str):
|
||||
"""Send an error response message."""
|
||||
if self._errors_enabled:
|
||||
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
|
||||
await self._push_transport_message(message)
|
||||
message = RTVIErrorResponse(id=id, data=RTVIErrorResponseData(error=error))
|
||||
await self.push_transport_message(message)
|
||||
|
||||
def _action_id(self, service: str, action: str) -> str:
|
||||
"""Generate an action ID from service and action names."""
|
||||
|
||||
@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
TransportMessageFrame,
|
||||
UserSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
@@ -36,9 +36,9 @@ class FrameLogger(FrameProcessor):
|
||||
color: Optional[str] = None,
|
||||
ignored_frame_types: Tuple[Type[Frame], ...] = (
|
||||
BotSpeakingFrame,
|
||||
UserSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
TransportMessageFrame,
|
||||
),
|
||||
):
|
||||
"""Initialize the frame logger.
|
||||
|
||||
@@ -70,7 +70,9 @@ import asyncio
|
||||
import os
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
@@ -82,7 +84,7 @@ from pipecat.runner.types import (
|
||||
try:
|
||||
import uvicorn
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, FastAPI, Request, WebSocket
|
||||
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
except ImportError as e:
|
||||
@@ -166,6 +168,7 @@ def _create_server_app(
|
||||
# Set up transport-specific routes
|
||||
if transport_type == "webrtc":
|
||||
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
|
||||
_setup_whatsapp_routes(app)
|
||||
elif transport_type == "daily":
|
||||
_setup_daily_routes(app)
|
||||
elif transport_type in ["twilio", "telnyx", "plivo", "exotel"]:
|
||||
@@ -221,12 +224,184 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
|
||||
return answer
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
async def smallwebrtc_lifespan(app: FastAPI):
|
||||
"""Manage FastAPI application lifecycle and cleanup connections."""
|
||||
yield
|
||||
await small_webrtc_handler.close()
|
||||
|
||||
app.router.lifespan_context = lifespan
|
||||
# Add the SmallWebRTC lifespan to the app
|
||||
_add_lifespan_to_app(app, smallwebrtc_lifespan)
|
||||
|
||||
|
||||
def _add_lifespan_to_app(app: FastAPI, new_lifespan):
|
||||
"""Add a new lifespan context manager to the app, combining with existing if present.
|
||||
|
||||
Args:
|
||||
app: The FastAPI application instance
|
||||
new_lifespan: The new lifespan context manager to add
|
||||
"""
|
||||
if hasattr(app.router, "lifespan_context") and app.router.lifespan_context is not None:
|
||||
# If there's already a lifespan context, combine them
|
||||
existing_lifespan = app.router.lifespan_context
|
||||
|
||||
@asynccontextmanager
|
||||
async def combined_lifespan(app: FastAPI):
|
||||
async with existing_lifespan(app):
|
||||
async with new_lifespan(app):
|
||||
yield
|
||||
|
||||
app.router.lifespan_context = combined_lifespan
|
||||
else:
|
||||
# No existing lifespan, use the new one
|
||||
app.router.lifespan_context = new_lifespan
|
||||
|
||||
|
||||
def _setup_whatsapp_routes(app: FastAPI):
|
||||
"""Set up WebRTC-specific routes."""
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.transports.smallwebrtc.request_handler import (
|
||||
SmallWebRTCRequest,
|
||||
SmallWebRTCRequestHandler,
|
||||
)
|
||||
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
|
||||
from pipecat.transports.whatsapp.client import WhatsAppClient
|
||||
except ImportError as e:
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
|
||||
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
|
||||
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
|
||||
|
||||
if not all(
|
||||
[
|
||||
WHATSAPP_TOKEN,
|
||||
WHATSAPP_PHONE_NUMBER_ID,
|
||||
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
|
||||
]
|
||||
):
|
||||
logger.debug(
|
||||
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
|
||||
)
|
||||
return
|
||||
|
||||
# Global WhatsApp client instance
|
||||
whatsapp_client: Optional[WhatsAppClient] = None
|
||||
|
||||
@app.get(
|
||||
"/whatsapp",
|
||||
summary="Verify WhatsApp webhook",
|
||||
description="Handles WhatsApp webhook verification requests from Meta",
|
||||
)
|
||||
async def verify_webhook(request: Request):
|
||||
"""Verify WhatsApp webhook endpoint.
|
||||
|
||||
This endpoint is called by Meta's WhatsApp Business API to verify
|
||||
the webhook URL during setup. It validates the verification token
|
||||
and returns the challenge parameter if successful.
|
||||
"""
|
||||
if whatsapp_client is None:
|
||||
logger.error("WhatsApp client is not initialized")
|
||||
raise HTTPException(status_code=503, detail="Service unavailable")
|
||||
|
||||
params = dict(request.query_params)
|
||||
logger.debug(f"Webhook verification request received with params: {list(params.keys())}")
|
||||
|
||||
try:
|
||||
result = await whatsapp_client.handle_verify_webhook_request(
|
||||
params=params, expected_verification_token=WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
|
||||
)
|
||||
logger.info("Webhook verification successful")
|
||||
return result
|
||||
except ValueError as e:
|
||||
logger.warning(f"Webhook verification failed: {e}")
|
||||
raise HTTPException(status_code=403, detail="Verification failed")
|
||||
|
||||
@app.post(
|
||||
"/whatsapp",
|
||||
summary="Handle WhatsApp webhook events",
|
||||
description="Processes incoming WhatsApp messages and call events",
|
||||
)
|
||||
async def whatsapp_webhook(
|
||||
body: WhatsAppWebhookRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
request: Request,
|
||||
x_hub_signature_256: str = Header(None),
|
||||
):
|
||||
"""Handle incoming WhatsApp webhook events.
|
||||
|
||||
For call events, establishes WebRTC connections and spawns bot instances
|
||||
in the background to handle real-time communication.
|
||||
"""
|
||||
if whatsapp_client is None:
|
||||
logger.error("WhatsApp client is not initialized")
|
||||
raise HTTPException(status_code=503, detail="Service unavailable")
|
||||
|
||||
# Validate webhook object type
|
||||
if body.object != "whatsapp_business_account":
|
||||
logger.warning(f"Invalid webhook object type: {body.object}")
|
||||
raise HTTPException(status_code=400, detail="Invalid object type")
|
||||
|
||||
logger.debug(f"Processing WhatsApp webhook: {body.model_dump()}")
|
||||
|
||||
async def connection_callback(connection: SmallWebRTCConnection):
|
||||
"""Handle new WebRTC connections from WhatsApp calls.
|
||||
|
||||
Called when a WebRTC connection is established for a WhatsApp call.
|
||||
Spawns a bot instance to handle the conversation.
|
||||
|
||||
Args:
|
||||
connection: The established WebRTC connection
|
||||
"""
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
|
||||
background_tasks.add_task(bot_module.bot, runner_args)
|
||||
|
||||
try:
|
||||
# Process the webhook request
|
||||
raw_body = await request.body()
|
||||
result = await whatsapp_client.handle_webhook_request(
|
||||
body, connection_callback, sha256_signature=x_hub_signature_256, raw_body=raw_body
|
||||
)
|
||||
logger.debug(f"Webhook processed successfully: {result}")
|
||||
return {"status": "success", "message": "Webhook processed successfully"}
|
||||
except ValueError as ve:
|
||||
logger.warning(f"Invalid webhook request format: {ve}")
|
||||
raise HTTPException(status_code=400, detail=f"Invalid request: {str(ve)}")
|
||||
except Exception as e:
|
||||
logger.error(f"Internal error processing webhook: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error processing webhook")
|
||||
|
||||
@asynccontextmanager
|
||||
async def whatsapp_lifespan(app: FastAPI):
|
||||
"""Manage WhatsApp client lifecycle and cleanup connections."""
|
||||
nonlocal whatsapp_client
|
||||
|
||||
# Initialize WhatsApp client with persistent HTTP session
|
||||
async with aiohttp.ClientSession() as session:
|
||||
whatsapp_client = WhatsAppClient(
|
||||
whatsapp_token=WHATSAPP_TOKEN,
|
||||
whatsapp_secret=WHATSAPP_APP_SECRET,
|
||||
phone_number_id=WHATSAPP_PHONE_NUMBER_ID,
|
||||
session=session,
|
||||
)
|
||||
logger.info("WhatsApp client initialized successfully")
|
||||
|
||||
try:
|
||||
yield # Run the application
|
||||
finally:
|
||||
# Cleanup all active calls on shutdown
|
||||
logger.info("Cleaning up WhatsApp client resources...")
|
||||
if whatsapp_client:
|
||||
await whatsapp_client.terminate_all_calls()
|
||||
logger.info("WhatsApp cleanup completed")
|
||||
|
||||
# Add the WhatsApp lifespan to the app
|
||||
_add_lifespan_to_app(app, whatsapp_lifespan)
|
||||
|
||||
|
||||
def _setup_daily_routes(app: FastAPI):
|
||||
|
||||
@@ -99,29 +99,41 @@ async def parse_telephony_websocket(websocket: WebSocket):
|
||||
tuple: (transport_type: str, call_data: dict)
|
||||
|
||||
call_data contains provider-specific fields:
|
||||
- Twilio: {
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
"body": dict
|
||||
}
|
||||
- Telnyx: {
|
||||
"stream_id": str,
|
||||
"call_control_id": str,
|
||||
"outbound_encoding": str,
|
||||
"from": str,
|
||||
"to": str,
|
||||
}
|
||||
- Plivo: {
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
}
|
||||
- Exotel: {
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
"account_sid": str,
|
||||
"from": str,
|
||||
"to": str,
|
||||
}
|
||||
|
||||
- Twilio::
|
||||
|
||||
{
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
"body": dict
|
||||
}
|
||||
|
||||
- Telnyx::
|
||||
|
||||
{
|
||||
"stream_id": str,
|
||||
"call_control_id": str,
|
||||
"outbound_encoding": str,
|
||||
"from": str,
|
||||
"to": str,
|
||||
}
|
||||
|
||||
- Plivo::
|
||||
|
||||
{
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
}
|
||||
|
||||
- Exotel::
|
||||
|
||||
{
|
||||
"stream_id": str,
|
||||
"call_id": str,
|
||||
"account_sid": str,
|
||||
"from": str,
|
||||
"to": str,
|
||||
}
|
||||
|
||||
Example usage::
|
||||
|
||||
@@ -301,6 +313,7 @@ def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
|
||||
Returns:
|
||||
Cleaned SDP text with filtered ICE candidates.
|
||||
"""
|
||||
logger.debug("Removing unsupported ICE candidates from SDP")
|
||||
result = []
|
||||
lines = text.splitlines()
|
||||
for line in lines:
|
||||
@@ -309,7 +322,7 @@ def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
|
||||
result.append(line)
|
||||
else:
|
||||
result.append(line)
|
||||
return "\r\n".join(result)
|
||||
return "\r\n".join(result) + "\r\n"
|
||||
|
||||
|
||||
def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
|
||||
@@ -321,15 +334,16 @@ def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
|
||||
Returns:
|
||||
SDP text with sha-384 and sha-512 fingerprints removed.
|
||||
"""
|
||||
logger.debug("Removing unsupported fingerprints from SDP")
|
||||
result = []
|
||||
lines = text.splitlines()
|
||||
for line in lines:
|
||||
if not re.search("sha-384", line) and not re.search("sha-512", line):
|
||||
result.append(line)
|
||||
return "\r\n".join(result)
|
||||
return "\r\n".join(result) + "\r\n"
|
||||
|
||||
|
||||
def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
|
||||
def smallwebrtc_sdp_munging(sdp: str, host: Optional[str]) -> str:
|
||||
"""Apply SDP modifications for SmallWebRTC compatibility.
|
||||
|
||||
Args:
|
||||
@@ -340,7 +354,8 @@ def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
|
||||
Modified SDP string with fingerprint and ICE candidate cleanup.
|
||||
"""
|
||||
sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp)
|
||||
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
|
||||
if host:
|
||||
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
|
||||
return sdp
|
||||
|
||||
|
||||
|
||||
@@ -21,9 +21,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
@@ -121,7 +121,7 @@ class ExotelFrameSerializer(FrameSerializer):
|
||||
}
|
||||
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
|
||||
return json.dumps(frame.message)
|
||||
|
||||
return None
|
||||
|
||||
@@ -23,9 +23,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
@@ -148,7 +148,7 @@ class PlivoFrameSerializer(FrameSerializer):
|
||||
}
|
||||
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
|
||||
return json.dumps(frame.message)
|
||||
|
||||
# Return None for unhandled frames
|
||||
|
||||
@@ -15,11 +15,12 @@ import pipecat.frames.protobufs.frames_pb2 as frame_protos
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputTransportMessageFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
@@ -82,7 +83,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
Serialized frame as bytes, or None if frame type is not serializable.
|
||||
"""
|
||||
# Wrapping this messages as a JSONFrame to send
|
||||
if isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
|
||||
frame = MessageFrame(
|
||||
data=json.dumps(frame.message),
|
||||
)
|
||||
@@ -134,11 +135,11 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
if "pts" in args_dict:
|
||||
del args_dict["pts"]
|
||||
|
||||
# Special handling for MessageFrame -> TransportMessageUrgentFrame
|
||||
# Special handling for MessageFrame -> OutputTransportMessageUrgentFrame
|
||||
if class_name == MessageFrame:
|
||||
try:
|
||||
msg = json.loads(args_dict["data"])
|
||||
instance = TransportMessageUrgentFrame(message=msg)
|
||||
instance = InputTransportMessageFrame(message=msg)
|
||||
logger.debug(f"ProtobufFrameSerializer: Transport message {instance}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing MessageFrame data: {e}")
|
||||
|
||||
@@ -23,9 +23,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
@@ -175,7 +175,7 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
}
|
||||
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
|
||||
return json.dumps(frame.message)
|
||||
|
||||
# Return None for unhandled frames
|
||||
|
||||
@@ -151,7 +151,7 @@ class AnthropicLLMService(LLMService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "claude-sonnet-4-20250514",
|
||||
model: str = "claude-sonnet-4-5-20250929",
|
||||
params: Optional[InputParams] = None,
|
||||
client=None,
|
||||
retry_timeout_secs: Optional[float] = 5.0,
|
||||
@@ -162,7 +162,7 @@ class AnthropicLLMService(LLMService):
|
||||
|
||||
Args:
|
||||
api_key: Anthropic API key for authentication.
|
||||
model: Model name to use. Defaults to "claude-sonnet-4-20250514".
|
||||
model: Model name to use. Defaults to "claude-sonnet-4-5-20250929".
|
||||
params: Optional model parameters for inference.
|
||||
client: Optional custom Anthropic client instance.
|
||||
retry_timeout_secs: Request timeout in seconds for retry logic.
|
||||
|
||||
@@ -61,7 +61,6 @@ from pipecat.utils.tracing.service_decorators import traced_llm
|
||||
|
||||
try:
|
||||
import aioboto3
|
||||
import httpx
|
||||
from botocore.config import Config
|
||||
from botocore.exceptions import ReadTimeoutError
|
||||
except ModuleNotFoundError as e:
|
||||
@@ -1117,7 +1116,7 @@ class AWSBedrockLLMService(LLMService):
|
||||
# also get cancelled.
|
||||
use_completion_tokens_estimate = True
|
||||
raise
|
||||
except httpx.TimeoutException:
|
||||
except (ReadTimeoutError, asyncio.TimeoutError):
|
||||
await self._call_event_handler("on_completion_timeout")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
@@ -70,7 +70,7 @@ try:
|
||||
BedrockRuntimeClient,
|
||||
InvokeModelWithBidirectionalStreamOperationInput,
|
||||
)
|
||||
from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
|
||||
from aws_sdk_bedrock_runtime.config import Config
|
||||
from aws_sdk_bedrock_runtime.models import (
|
||||
BidirectionalInputPayloadPart,
|
||||
InvokeModelWithBidirectionalStreamInput,
|
||||
@@ -78,8 +78,8 @@ try:
|
||||
InvokeModelWithBidirectionalStreamOperationOutput,
|
||||
InvokeModelWithBidirectionalStreamOutput,
|
||||
)
|
||||
from smithy_aws_core.credentials_resolvers.static import StaticCredentialsResolver
|
||||
from smithy_aws_core.identity import AWSCredentialsIdentity
|
||||
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme
|
||||
from smithy_aws_core.identity.static import StaticCredentialsResolver
|
||||
from smithy_core.aio.eventstream import DuplexEventStream
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
@@ -429,7 +429,7 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
await self._finish_connecting_if_context_available()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._disconnect()
|
||||
await self._disconnect()
|
||||
|
||||
async def _finish_connecting_if_context_available(self):
|
||||
# We can only finish connecting once we've gotten our initial context and we're ready to
|
||||
@@ -528,15 +528,11 @@ class AWSNovaSonicLLMService(LLMService):
|
||||
config = Config(
|
||||
endpoint_uri=f"https://bedrock-runtime.{self._region}.amazonaws.com",
|
||||
region=self._region,
|
||||
aws_credentials_identity_resolver=StaticCredentialsResolver(
|
||||
credentials=AWSCredentialsIdentity(
|
||||
access_key_id=self._access_key_id,
|
||||
secret_access_key=self._secret_access_key,
|
||||
session_token=self._session_token,
|
||||
)
|
||||
),
|
||||
http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
|
||||
http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()},
|
||||
aws_access_key_id=self._access_key_id,
|
||||
aws_secret_access_key=self._secret_access_key,
|
||||
aws_session_token=self._session_token,
|
||||
aws_credentials_identity_resolver=StaticCredentialsResolver(),
|
||||
auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="bedrock")},
|
||||
)
|
||||
return BedrockRuntimeClient(config=config)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import sys
|
||||
|
||||
from pipecat.services import DeprecatedModuleProxy
|
||||
|
||||
from .flux import *
|
||||
from .stt import *
|
||||
from .tts import *
|
||||
|
||||
|
||||
0
src/pipecat/services/deepgram/flux/__init__.py
Normal file
0
src/pipecat/services/deepgram/flux/__init__.py
Normal file
636
src/pipecat/services/deepgram/flux/stt.py
Normal file
636
src/pipecat/services/deepgram/flux/stt.py
Normal file
@@ -0,0 +1,636 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Deepgram Flux speech-to-text service implementation."""
|
||||
|
||||
import json
|
||||
from enum import Enum
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.stt_service import WebsocketSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_stt
|
||||
|
||||
try:
|
||||
from websockets.asyncio.client import connect as websocket_connect
|
||||
from websockets.protocol import State
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FluxMessageType(str, Enum):
|
||||
"""Deepgram Flux WebSocket message types.
|
||||
|
||||
These are the top-level message types that can be received from the
|
||||
Deepgram Flux WebSocket connection.
|
||||
"""
|
||||
|
||||
RECEIVE_CONNECTED = "Connected"
|
||||
RECEIVE_FATAL_ERROR = "Error"
|
||||
TURN_INFO = "TurnInfo"
|
||||
|
||||
|
||||
class FluxEventType(str, Enum):
|
||||
"""Deepgram Flux TurnInfo event types.
|
||||
|
||||
These events are contained within TurnInfo messages and indicate
|
||||
different stages of speech processing and turn detection.
|
||||
"""
|
||||
|
||||
START_OF_TURN = "StartOfTurn"
|
||||
TURN_RESUMED = "TurnResumed"
|
||||
END_OF_TURN = "EndOfTurn"
|
||||
EAGER_END_OF_TURN = "EagerEndOfTurn"
|
||||
UPDATE = "Update"
|
||||
|
||||
|
||||
class DeepgramFluxSTTService(WebsocketSTTService):
|
||||
"""Deepgram Flux speech-to-text service.
|
||||
|
||||
Provides real-time speech recognition using Deepgram's WebSocket API with Flux capabilities.
|
||||
Supports configurable models, VAD events, and various audio processing options
|
||||
including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Configuration parameters for Deepgram Flux API.
|
||||
|
||||
This class defines all available connection parameters for the Deepgram Flux API
|
||||
based on the official documentation.
|
||||
|
||||
Parameters:
|
||||
eager_eot_threshold: Optional. EagerEndOfTurn/TurnResumed are off by default.
|
||||
You can turn them on by setting eager_eot_threshold to a valid value.
|
||||
Lower values = more aggressive EagerEndOfTurning (faster response, more LLM calls).
|
||||
Higher values = more conservative EagerEndOfTurning (slower response, fewer LLM calls).
|
||||
eot_threshold: Optional. End-of-turn confidence required to finish a turn (default 0.7).
|
||||
Lower values = turns end sooner (more interruptions, faster responses).
|
||||
Higher values = turns end later (fewer interruptions, more complete utterances).
|
||||
eot_timeout_ms: Optional. Time in milliseconds after speech to finish a turn
|
||||
regardless of EOT confidence (default 5000).
|
||||
keyterm: List of keyterms to boost recognition accuracy for specialized terminology.
|
||||
mip_opt_out: Optional. Opts out requests from the Deepgram Model Improvement Program
|
||||
(default False).
|
||||
tag: List of tags to label requests for identification during usage reporting.
|
||||
"""
|
||||
|
||||
eager_eot_threshold: Optional[float] = None
|
||||
eot_threshold: Optional[float] = None
|
||||
eot_timeout_ms: Optional[int] = None
|
||||
keyterm: list = []
|
||||
mip_opt_out: Optional[bool] = None
|
||||
tag: list = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "wss://api.deepgram.com/v2/listen",
|
||||
sample_rate: Optional[int] = None,
|
||||
model: str = "flux-general-en",
|
||||
flux_encoding: str = "linear16",
|
||||
params: Optional[InputParams] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
api_key: Deepgram API key for authentication. Required for API access.
|
||||
url: WebSocket URL for the Deepgram Flux API. Defaults to the preview endpoint.
|
||||
sample_rate: Audio sample rate in Hz. If None, uses the rate from params or 16000.
|
||||
model: Deepgram Flux model to use for transcription. Currently only supports "flux-general-en".
|
||||
flux_encoding: Audio encoding format required by Flux API. Must be "linear16".
|
||||
Raw signed little-endian 16-bit PCM encoding.
|
||||
params: InputParams instance containing detailed API configuration options.
|
||||
If None, default parameters will be used.
|
||||
**kwargs: Additional arguments passed to the parent WebsocketSTTService class.
|
||||
|
||||
Examples:
|
||||
Basic usage with default parameters::
|
||||
|
||||
stt = DeepgramFluxSTTService(api_key="your-api-key")
|
||||
|
||||
Advanced usage with custom parameters::
|
||||
|
||||
params = DeepgramFluxSTTService.InputParams(
|
||||
eager_eot_threshold=0.5,
|
||||
eot_threshold=0.8,
|
||||
keyterm=["AI", "machine learning", "neural network"],
|
||||
tag=["production", "voice-agent"]
|
||||
)
|
||||
stt = DeepgramFluxSTTService(
|
||||
api_key="your-api-key",
|
||||
model="flux-general-en",
|
||||
params=params
|
||||
)
|
||||
"""
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._model = model
|
||||
self._params = params or DeepgramFluxSTTService.InputParams()
|
||||
self._flux_encoding = flux_encoding
|
||||
# This is the currently only supported language
|
||||
self._language = Language.EN
|
||||
self._websocket_url = None
|
||||
self._receive_task = None
|
||||
|
||||
async def _connect(self):
|
||||
"""Connect to WebSocket and start background tasks.
|
||||
|
||||
Establishes the WebSocket connection to the Deepgram Flux API and starts
|
||||
the background task for receiving transcription results.
|
||||
"""
|
||||
await self._connect_websocket()
|
||||
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Disconnect from WebSocket and clean up tasks.
|
||||
|
||||
Gracefully disconnects from the Deepgram Flux API, cancels background tasks,
|
||||
and cleans up resources to prevent memory leaks.
|
||||
"""
|
||||
try:
|
||||
# Cancel background tasks BEFORE closing websocket
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task, timeout=2.0)
|
||||
self._receive_task = None
|
||||
|
||||
# Now close the websocket
|
||||
await self._disconnect_websocket()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during disconnect: {e}")
|
||||
finally:
|
||||
# Reset state only after everything is cleaned up
|
||||
self._websocket = None
|
||||
|
||||
async def _connect_websocket(self):
|
||||
"""Establish WebSocket connection to API.
|
||||
|
||||
Creates a WebSocket connection to the Deepgram Flux API using the configured
|
||||
URL and authentication headers. Handles connection errors and reports them
|
||||
through the event handler system.
|
||||
"""
|
||||
try:
|
||||
if self._websocket and self._websocket.state is State.OPEN:
|
||||
return
|
||||
|
||||
self._websocket = await websocket_connect(
|
||||
self._websocket_url,
|
||||
additional_headers={"Authorization": f"Token {self._api_key}"},
|
||||
)
|
||||
logger.debug("Connected to Deepgram Flux Websocket")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
await self._call_event_handler("on_connection_error", f"{e}")
|
||||
|
||||
async def _disconnect_websocket(self):
|
||||
"""Close WebSocket connection and clean up state.
|
||||
|
||||
Closes the WebSocket connection to the Deepgram Flux API and stops all
|
||||
metrics collection. Handles disconnection errors gracefully.
|
||||
"""
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._websocket:
|
||||
await self._send_close_stream()
|
||||
logger.debug("Disconnecting from Deepgram Flux Websocket")
|
||||
await self._websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _send_close_stream(self) -> None:
|
||||
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.
|
||||
|
||||
This signals to the server that no more audio data will be sent.
|
||||
"""
|
||||
if self._websocket:
|
||||
logger.debug("Sending CloseStream message to Deepgram Flux")
|
||||
message = {"type": "CloseStream"}
|
||||
await self._websocket.send(json.dumps(message))
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Check if this service can generate processing metrics.
|
||||
|
||||
Returns:
|
||||
True, as Deepgram service supports metrics generation.
|
||||
"""
|
||||
return True
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Start the Deepgram Flux STT service.
|
||||
|
||||
Initializes the service by constructing the WebSocket URL with all configured
|
||||
parameters and establishing the connection to begin transcription processing.
|
||||
|
||||
Args:
|
||||
frame: The start frame containing initialization parameters and metadata.
|
||||
"""
|
||||
await super().start(frame)
|
||||
|
||||
url_params = [
|
||||
f"model={self._model}",
|
||||
f"sample_rate={self.sample_rate}",
|
||||
f"encoding={self._flux_encoding}",
|
||||
]
|
||||
|
||||
if self._params.eager_eot_threshold is not None:
|
||||
url_params.append(f"eager_eot_threshold={self._params.eager_eot_threshold}")
|
||||
|
||||
if self._params.eot_threshold is not None:
|
||||
url_params.append(f"eot_threshold={self._params.eot_threshold}")
|
||||
|
||||
if self._params.eot_timeout_ms is not None:
|
||||
url_params.append(f"eot_timeout_ms={self._params.eot_timeout_ms}")
|
||||
|
||||
if self._params.mip_opt_out is not None:
|
||||
url_params.append(f"mip_opt_out={str(self._params.mip_opt_out).lower()}")
|
||||
|
||||
# Add keyterm parameters (can have multiple)
|
||||
for keyterm in self._params.keyterm:
|
||||
url_params.append(f"keyterm={keyterm}")
|
||||
|
||||
# Add tag parameters (can have multiple)
|
||||
for tag_value in self._params.tag:
|
||||
url_params.append(f"tag={tag_value}")
|
||||
|
||||
self._websocket_url = f"{self._url}?{'&'.join(url_params)}"
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
"""Stop the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The end frame.
|
||||
"""
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
"""Cancel the Deepgram Flux STT service.
|
||||
|
||||
Args:
|
||||
frame: The cancel frame.
|
||||
"""
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Send audio data to Deepgram Flux for transcription.
|
||||
|
||||
Transmits raw audio bytes to the Deepgram Flux API for real-time speech
|
||||
recognition. Transcription results are received asynchronously through
|
||||
WebSocket callbacks and processed in the background.
|
||||
|
||||
Args:
|
||||
audio: Raw audio bytes in linear16 format (signed little-endian 16-bit PCM).
|
||||
|
||||
Yields:
|
||||
Frame: None (transcription results are delivered via WebSocket callbacks
|
||||
rather than as return values from this method).
|
||||
|
||||
Raises:
|
||||
Exception: If the WebSocket connection is not established or if there
|
||||
are issues sending the audio data.
|
||||
"""
|
||||
if not self._websocket:
|
||||
logger.error("Not connected to Deepgram Flux.")
|
||||
yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True)
|
||||
return
|
||||
|
||||
try:
|
||||
await self._websocket.send(audio)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send audio to Flux: {e}")
|
||||
yield ErrorFrame(f"Failed to send audio to Flux: {e}")
|
||||
return
|
||||
|
||||
yield None
|
||||
|
||||
async def start_metrics(self):
|
||||
"""Start TTFB and processing metrics collection."""
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@traced_stt
|
||||
async def _handle_transcription(
|
||||
self, transcript: str, is_final: bool, language: Optional[Language] = None
|
||||
):
|
||||
"""Handle a transcription result with tracing."""
|
||||
pass
|
||||
|
||||
def _get_websocket(self):
|
||||
"""Get the current WebSocket connection.
|
||||
|
||||
Returns the active WebSocket connection instance, raising an exception
|
||||
if no connection is currently established.
|
||||
|
||||
Returns:
|
||||
The active WebSocket connection instance.
|
||||
|
||||
Raises:
|
||||
Exception: If no WebSocket connection is currently active.
|
||||
"""
|
||||
if self._websocket:
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
def _validate_message(self, data: Dict[str, Any]) -> bool:
|
||||
"""Validate basic message structure from Deepgram Flux.
|
||||
|
||||
Ensures the received message has the expected structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data to validate.
|
||||
|
||||
Returns:
|
||||
True if the message structure is valid, False otherwise.
|
||||
"""
|
||||
if not isinstance(data, dict):
|
||||
logger.warning("Message is not a dictionary")
|
||||
return False
|
||||
|
||||
if "type" not in data:
|
||||
logger.warning("Message missing 'type' field")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _receive_messages(self):
|
||||
"""Receive and process messages from WebSocket.
|
||||
|
||||
Continuously receives messages from the Deepgram Flux WebSocket connection
|
||||
and processes various message types including connection status, transcription
|
||||
results, turn information, and error conditions. Handles different event types
|
||||
such as StartOfTurn, EndOfTurn, EagerEndOfTurn, and Update events.
|
||||
"""
|
||||
async for message in self._get_websocket():
|
||||
if isinstance(message, str):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._handle_message(data)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to decode JSON message: {e}")
|
||||
# Skip malformed messages
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing message: {e}")
|
||||
# Error will be handled inside WebsocketService->_receive_task_handler
|
||||
raise
|
||||
else:
|
||||
logger.warning(f"Received non-string message: {type(message)}")
|
||||
|
||||
async def _handle_message(self, data: Dict[str, Any]):
|
||||
"""Handle a parsed WebSocket message from Deepgram Flux.
|
||||
|
||||
Routes messages to appropriate handlers based on their type. Validates
|
||||
message structure before processing.
|
||||
|
||||
Args:
|
||||
data: The parsed JSON message data from the WebSocket.
|
||||
"""
|
||||
if not self._validate_message(data):
|
||||
return
|
||||
|
||||
message_type = data.get("type")
|
||||
|
||||
try:
|
||||
flux_message_type = FluxMessageType(message_type)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
|
||||
return
|
||||
|
||||
match flux_message_type:
|
||||
case FluxMessageType.RECEIVE_CONNECTED:
|
||||
await self._handle_connection_established()
|
||||
case FluxMessageType.RECEIVE_FATAL_ERROR:
|
||||
await self._handle_fatal_error(data)
|
||||
case FluxMessageType.TURN_INFO:
|
||||
await self._handle_turn_info(data)
|
||||
|
||||
async def _handle_connection_established(self):
|
||||
"""Handle successful connection establishment to Deepgram Flux.
|
||||
|
||||
This event is fired when the WebSocket connection to Deepgram Flux
|
||||
is successfully established and ready to receive audio data for
|
||||
transcription processing.
|
||||
"""
|
||||
logger.info("Connected to Flux - ready to stream audio")
|
||||
|
||||
async def _handle_fatal_error(self, data: Dict[str, Any]):
|
||||
"""Handle fatal error messages from Deepgram Flux.
|
||||
|
||||
Fatal errors indicate unrecoverable issues with the connection or
|
||||
configuration that require intervention. These errors will cause
|
||||
the connection to be terminated.
|
||||
|
||||
Args:
|
||||
data: The error message data containing error details.
|
||||
|
||||
Raises:
|
||||
Exception: Always raises to trigger error handling in the parent service.
|
||||
"""
|
||||
error_msg = data.get("error", "Unknown error")
|
||||
deepgram_error = f"Fatal error: {error_msg}"
|
||||
logger.error(deepgram_error)
|
||||
# Error will be handled inside WebsocketService->_receive_task_handler
|
||||
raise Exception(deepgram_error)
|
||||
|
||||
async def _handle_turn_info(self, data: Dict[str, Any]):
|
||||
"""Handle TurnInfo events from Deepgram Flux.
|
||||
|
||||
TurnInfo messages contain various turn-based events that indicate
|
||||
the state of speech processing, including turn boundaries, interim
|
||||
results, and turn finalization events.
|
||||
|
||||
Args:
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
event = data.get("event")
|
||||
transcript = data.get("transcript", "")
|
||||
|
||||
try:
|
||||
flux_event_type = FluxEventType(event)
|
||||
except ValueError:
|
||||
logger.debug(f"Unhandled TurnInfo event: {event}")
|
||||
return
|
||||
|
||||
match flux_event_type:
|
||||
case FluxEventType.START_OF_TURN:
|
||||
await self._handle_start_of_turn(transcript)
|
||||
case FluxEventType.TURN_RESUMED:
|
||||
await self._handle_turn_resumed(event)
|
||||
case FluxEventType.END_OF_TURN:
|
||||
await self._handle_end_of_turn(transcript, data)
|
||||
case FluxEventType.EAGER_END_OF_TURN:
|
||||
await self._handle_eager_end_of_turn(transcript, data)
|
||||
case FluxEventType.UPDATE:
|
||||
await self._handle_update(transcript)
|
||||
|
||||
async def _handle_start_of_turn(self, transcript: str):
|
||||
"""Handle StartOfTurn events from Deepgram Flux.
|
||||
|
||||
StartOfTurn events are fired when Deepgram Flux detects the beginning
|
||||
of a new speaking turn. This triggers bot interruption to stop any
|
||||
ongoing speech synthesis and signals the start of user speech detection.
|
||||
|
||||
The service will:
|
||||
- Send a BotInterruptionFrame upstream to stop bot speech
|
||||
- Send a UserStartedSpeakingFrame downstream to notify other components
|
||||
- Start metrics collection for measuring response times
|
||||
|
||||
Args:
|
||||
transcript: maybe the first few words of the turn.
|
||||
"""
|
||||
logger.debug("User started speaking")
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
await self.start_metrics()
|
||||
if transcript:
|
||||
logger.trace(f"Start of turn transcript: {transcript}")
|
||||
|
||||
async def _handle_turn_resumed(self, event: str):
|
||||
"""Handle TurnResumed events from Deepgram Flux.
|
||||
|
||||
TurnResumed events indicate that speech has resumed after a brief pause
|
||||
within the same turn. This is primarily used for logging and debugging
|
||||
purposes and doesn't trigger any significant processing changes.
|
||||
|
||||
Args:
|
||||
event: The event type string for logging purposes.
|
||||
"""
|
||||
logger.trace(f"Received event TurnResumed: {event}")
|
||||
|
||||
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EndOfTurn events from Deepgram Flux.
|
||||
|
||||
EndOfTurn events are fired when Deepgram Flux determines that a speaking
|
||||
turn has concluded, either due to sufficient silence or end-of-turn
|
||||
confidence thresholds being met. This provides the final transcript
|
||||
for the completed turn.
|
||||
|
||||
The service will:
|
||||
- Create and send a final TranscriptionFrame with the complete transcript
|
||||
- Trigger transcription handling with tracing for metrics
|
||||
- Stop processing metrics collection
|
||||
- Send a UserStoppedSpeakingFrame to signal turn completion
|
||||
|
||||
Args:
|
||||
transcript: The final transcript text for the completed turn.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.debug("User stopped speaking")
|
||||
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._language,
|
||||
result=data,
|
||||
)
|
||||
)
|
||||
await self._handle_transcription(transcript, True, self._language)
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
|
||||
"""Handle EagerEndOfTurn events from Deepgram Flux.
|
||||
|
||||
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
|
||||
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
|
||||
These provide interim transcripts that can be used for faster response
|
||||
generation while still allowing the user to continue speaking.
|
||||
|
||||
EagerEndOfTurn events enable more responsive conversational AI by allowing
|
||||
the LLM to start processing likely final transcripts before the turn
|
||||
is definitively ended.
|
||||
|
||||
Args:
|
||||
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
|
||||
data: The TurnInfo message data containing event type, transcript and some extra metadata.
|
||||
"""
|
||||
logger.trace(f"EagerEndOfTurn - {transcript}")
|
||||
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
|
||||
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
|
||||
# begin early.
|
||||
#
|
||||
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
|
||||
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
|
||||
#
|
||||
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
|
||||
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
|
||||
# 2. Cancel in-flight processing when TurnResumed occurs
|
||||
#
|
||||
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
|
||||
# developers to implement custom EagerEndOfTurn handling in their applications while
|
||||
# maintaining compatibility with existing interim transcription workflows.
|
||||
#
|
||||
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
|
||||
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
|
||||
await self.push_frame(
|
||||
InterimTranscriptionFrame(
|
||||
transcript,
|
||||
self._user_id,
|
||||
time_now_iso8601(),
|
||||
self._language,
|
||||
result=data,
|
||||
)
|
||||
)
|
||||
|
||||
async def _handle_update(self, transcript: str):
|
||||
"""Handle Update events from Deepgram Flux.
|
||||
|
||||
Update events provide incremental transcript updates during an ongoing
|
||||
turn. These events allow for real-time display of transcription progress
|
||||
and can be used to provide visual feedback to users about what's being
|
||||
recognized.
|
||||
|
||||
The service stops TTFB (Time To First Byte) metrics when the first
|
||||
substantial update is received, indicating successful processing start.
|
||||
|
||||
Args:
|
||||
transcript: The current partial transcript text for the ongoing turn.
|
||||
"""
|
||||
if transcript:
|
||||
logger.trace(f"Update event: {transcript}")
|
||||
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
|
||||
# Ideally, TTFB should measure the time from when a user starts speaking
|
||||
# until we receive the first transcript. However, Deepgram Flux delivers
|
||||
# both the "user started speaking" event and the first transcript simultaneously,
|
||||
# making this timing measurement meaningless in this context.
|
||||
# await self.stop_ttfb_metrics()
|
||||
@@ -8,6 +8,7 @@ import sys
|
||||
|
||||
from pipecat.services import DeprecatedModuleProxy
|
||||
|
||||
from .stt import *
|
||||
from .tts import *
|
||||
|
||||
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "elevenlabs", "elevenlabs.tts")
|
||||
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "elevenlabs", "elevenlabs.[stt,tts]")
|
||||
|
||||
@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
OutputImageRawFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
@@ -72,6 +73,9 @@ try:
|
||||
HttpOptions,
|
||||
Part,
|
||||
)
|
||||
|
||||
# Temporary hack to be able to process Nano Banana returned images.
|
||||
genai._api_client.READ_BUFFER_SIZE = 5 * 1024 * 1024
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
|
||||
@@ -682,7 +686,7 @@ class GoogleLLMService(LLMService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "gemini-2.0-flash",
|
||||
model: str = "gemini-2.5-flash",
|
||||
params: Optional[InputParams] = None,
|
||||
system_instruction: Optional[str] = None,
|
||||
tools: Optional[List[Dict[str, Any]]] = None,
|
||||
@@ -710,6 +714,7 @@ class GoogleLLMService(LLMService):
|
||||
self._api_key = api_key
|
||||
self._system_instruction = system_instruction
|
||||
self._http_options = http_options
|
||||
|
||||
self._create_client(api_key, http_options)
|
||||
self._settings = {
|
||||
"max_tokens": params.max_tokens,
|
||||
@@ -788,6 +793,9 @@ class GoogleLLMService(LLMService):
|
||||
# and can be configured to turn it off.
|
||||
if not self._model_name.startswith("gemini-2.5-flash"):
|
||||
return
|
||||
# If we have an image model, we don't use a budget either.
|
||||
if "image" in self._model_name:
|
||||
return
|
||||
# If thinking_config is already set, don't override it.
|
||||
if "thinking_config" in generation_params:
|
||||
return
|
||||
@@ -927,6 +935,12 @@ class GoogleLLMService(LLMService):
|
||||
arguments=function_call.args or {},
|
||||
)
|
||||
)
|
||||
elif part.inline_data and part.inline_data.data:
|
||||
image = Image.open(io.BytesIO(part.inline_data.data))
|
||||
frame = OutputImageRawFrame(
|
||||
image=image.tobytes(), size=image.size, format="RGB"
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
if (
|
||||
candidate.grounding_metadata
|
||||
|
||||
5
src/pipecat/services/hume/__init__.py
Normal file
5
src/pipecat/services/hume/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
220
src/pipecat/services/hume/tts.py
Normal file
220
src/pipecat/services/hume/tts.py
Normal file
@@ -0,0 +1,220 @@
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
|
||||
"""Hume Text-to-Speech service implementation."""
|
||||
|
||||
import base64
|
||||
import os
|
||||
from typing import Any, AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.services.tts_service import TTSService
|
||||
from pipecat.utils.tracing.service_decorators import traced_tts
|
||||
|
||||
try:
|
||||
from hume import AsyncHumeClient
|
||||
from hume.tts import (
|
||||
FormatPcm,
|
||||
PostedUtterance,
|
||||
PostedUtteranceVoiceWithId,
|
||||
)
|
||||
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use Hume, you need to `pip install pipecat-ai[hume]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz
|
||||
|
||||
|
||||
class HumeTTSService(TTSService):
|
||||
"""Hume Octave Text-to-Speech service.
|
||||
|
||||
Streams PCM audio via Hume's HTTP output streaming (JSON chunks) endpoint
|
||||
using the Python SDK and emits `TTSAudioRawFrame`s suitable for Pipecat transports.
|
||||
|
||||
Supported features:
|
||||
|
||||
- Generates speech from text using Hume TTS.
|
||||
- Streams PCM audio.
|
||||
- Supports dynamic updates of voice and synthesis parameters at runtime.
|
||||
- Provides metrics for Time To First Byte (TTFB) and TTS usage.
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Optional synthesis parameters for Hume TTS.
|
||||
|
||||
Parameters:
|
||||
description: Natural-language acting directions (up to 100 characters).
|
||||
speed: Speaking-rate multiplier (0.5-2.0).
|
||||
trailing_silence: Seconds of silence to append at the end (0-5).
|
||||
"""
|
||||
|
||||
description: Optional[str] = None
|
||||
speed: Optional[float] = None
|
||||
trailing_silence: Optional[float] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
voice_id: str,
|
||||
params: Optional[InputParams] = None,
|
||||
sample_rate: Optional[int] = HUME_SAMPLE_RATE,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Initialize the HumeTTSService.
|
||||
|
||||
Args:
|
||||
api_key: Hume API key. If omitted, reads the ``HUME_API_KEY`` environment variable.
|
||||
voice_id: ID of the voice to use (ID-only; names are not supported here).
|
||||
params: Optional synthesis controls (acting instructions, speed, trailing silence).
|
||||
sample_rate: Output sample rate for emitted PCM frames. Defaults to 48_000 (Hume).
|
||||
**kwargs: Additional arguments passed to the parent class.
|
||||
"""
|
||||
api_key = api_key or os.getenv("HUME_API_KEY")
|
||||
if not api_key:
|
||||
raise ValueError("HumeTTSService requires an API key (env HUME_API_KEY or api_key=)")
|
||||
|
||||
if sample_rate != HUME_SAMPLE_RATE:
|
||||
logger.warning(
|
||||
f"Hume TTS streams at {HUME_SAMPLE_RATE} Hz; configured sample_rate={sample_rate}"
|
||||
)
|
||||
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._client = AsyncHumeClient(api_key=api_key)
|
||||
self._params = params or HumeTTSService.InputParams()
|
||||
|
||||
# Store voice in the base class (mirrors other services)
|
||||
self.set_voice(voice_id)
|
||||
|
||||
self._audio_bytes = b""
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Can generate metrics.
|
||||
|
||||
Returns:
|
||||
True if metrics can be generated, False otherwise.
|
||||
"""
|
||||
return True
|
||||
|
||||
async def start(self, frame: StartFrame) -> None:
|
||||
"""Start the service.
|
||||
|
||||
Args:
|
||||
frame: The start frame.
|
||||
"""
|
||||
await super().start(frame)
|
||||
|
||||
async def update_setting(self, key: str, value: Any) -> None:
|
||||
"""Runtime updates via `TTSUpdateSettingsFrame`.
|
||||
|
||||
Args:
|
||||
key: The name of the setting to update. Recognized keys are:
|
||||
- "voice_id"
|
||||
- "description"
|
||||
- "speed"
|
||||
- "trailing_silence"
|
||||
value: The new value for the setting.
|
||||
"""
|
||||
key_l = (key or "").lower()
|
||||
|
||||
if key_l == "voice_id":
|
||||
self.set_voice(str(value))
|
||||
logger.info(f"HumeTTSService voice_id set to: {self.voice}")
|
||||
elif key_l == "description":
|
||||
self._params.description = None if value is None else str(value)
|
||||
elif key_l == "speed":
|
||||
self._params.speed = None if value is None else float(value)
|
||||
elif key_l == "trailing_silence":
|
||||
self._params.trailing_silence = None if value is None else float(value)
|
||||
else:
|
||||
# Defer unknown keys to the base class
|
||||
await super().update_setting(key, value)
|
||||
|
||||
@traced_tts
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
"""Generate speech from text using Hume TTS.
|
||||
|
||||
Args:
|
||||
text: The text to be synthesized.
|
||||
|
||||
Returns:
|
||||
An async generator that yields `Frame` objects, including
|
||||
`TTSStartedFrame`, `TTSAudioRawFrame`, `ErrorFrame`, and
|
||||
`TTSStoppedFrame`.
|
||||
"""
|
||||
logger.debug(f"{self}: Generating Hume TTS: [{text}]")
|
||||
|
||||
# Build the request payload
|
||||
utterance_kwargs: dict[str, Any] = {
|
||||
"text": text,
|
||||
"voice": PostedUtteranceVoiceWithId(id=self._voice_id),
|
||||
}
|
||||
if self._params.description is not None:
|
||||
utterance_kwargs["description"] = self._params.description
|
||||
if self._params.speed is not None:
|
||||
utterance_kwargs["speed"] = self._params.speed
|
||||
if self._params.trailing_silence is not None:
|
||||
utterance_kwargs["trailing_silence"] = self._params.trailing_silence
|
||||
|
||||
utterance = PostedUtterance(**utterance_kwargs)
|
||||
|
||||
# Request raw PCM chunks in the streaming JSON
|
||||
pcm_fmt = FormatPcm(type="pcm")
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_tts_usage_metrics(text)
|
||||
yield TTSStartedFrame()
|
||||
|
||||
try:
|
||||
# Instant mode is always enabled here (not user-configurable)
|
||||
# Hume emits mono PCM at 48 kHz; downstream can resample if needed.
|
||||
# We buffer audio bytes before sending to prevent glitches.
|
||||
self._audio_bytes = b""
|
||||
async for chunk in self._client.tts.synthesize_json_streaming(
|
||||
utterances=[utterance],
|
||||
format=pcm_fmt,
|
||||
instant_mode=True,
|
||||
version="2",
|
||||
):
|
||||
audio_b64 = getattr(chunk, "audio", None)
|
||||
if not audio_b64:
|
||||
continue
|
||||
|
||||
pcm_bytes = base64.b64decode(audio_b64)
|
||||
self._audio_bytes += pcm_bytes
|
||||
|
||||
# Buffer audio until we have enough to avoid glitches
|
||||
if len(self._audio_bytes) < self.chunk_size:
|
||||
continue
|
||||
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=self._audio_bytes,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
yield frame
|
||||
|
||||
self._audio_bytes = b""
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error generating TTS: {e}")
|
||||
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
|
||||
finally:
|
||||
# Ensure TTFB timer is stopped even on early failures
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
@@ -337,10 +337,16 @@ class BaseOpenAILLMService(LLMService):
|
||||
|
||||
async for chunk in chunk_stream:
|
||||
if chunk.usage:
|
||||
cached_tokens = (
|
||||
chunk.usage.prompt_tokens_details.cached_tokens
|
||||
if chunk.usage.prompt_tokens_details
|
||||
else None
|
||||
)
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=chunk.usage.prompt_tokens,
|
||||
completion_tokens=chunk.usage.completion_tokens,
|
||||
total_tokens=chunk.usage.total_tokens,
|
||||
cache_read_input_tokens=cached_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from typing import TYPE_CHECKING
|
||||
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.services.openai_realtime_beta.context import OpenAIRealtimeLLMContext
|
||||
from pipecat.services.openai_realtime.context import OpenAIRealtimeLLMContext
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -14,6 +14,7 @@ import io
|
||||
import json
|
||||
import struct
|
||||
import uuid
|
||||
import warnings
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
import aiohttp
|
||||
@@ -110,6 +111,11 @@ def language_to_playht_language(language: Language) -> Optional[str]:
|
||||
class PlayHTTTSService(InterruptibleTTSService):
|
||||
"""PlayHT WebSocket-based text-to-speech service.
|
||||
|
||||
.. deprecated:: 0.0.88
|
||||
|
||||
This class is deprecated and will be removed in a future version.
|
||||
PlayHT is shutting down their API on December 31st, 2025.
|
||||
|
||||
Provides real-time text-to-speech synthesis using PlayHT's WebSocket API.
|
||||
Supports streaming audio generation with configurable voice engines and
|
||||
language settings.
|
||||
@@ -158,6 +164,15 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"PlayHT is shutting down their API on December 31st, 2025. "
|
||||
"'PlayHTTTSService' is deprecated and will be removed in a future version.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
params = params or PlayHTTTSService.InputParams()
|
||||
|
||||
self._api_key = api_key
|
||||
@@ -401,6 +416,11 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
class PlayHTHttpTTSService(TTSService):
|
||||
"""PlayHT HTTP-based text-to-speech service.
|
||||
|
||||
.. deprecated:: 0.0.88
|
||||
|
||||
This class is deprecated and will be removed in a future version.
|
||||
PlayHT is shutting down their API on December 31st, 2025.
|
||||
|
||||
Provides text-to-speech synthesis using PlayHT's HTTP API for simpler,
|
||||
non-streaming synthesis. Suitable for use cases where streaming is not
|
||||
required and simpler integration is preferred.
|
||||
@@ -454,8 +474,6 @@ class PlayHTHttpTTSService(TTSService):
|
||||
|
||||
# Warn about deprecated protocol parameter if explicitly provided
|
||||
if protocol:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
@@ -464,6 +482,15 @@ class PlayHTHttpTTSService(TTSService):
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"PlayHT is shutting down their API on December 31st, 2025. "
|
||||
"'PlayHTHttpTTSService' is deprecated and will be removed in a future version.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
params = params or PlayHTHttpTTSService.InputParams()
|
||||
|
||||
self._user_id = user_id
|
||||
|
||||
@@ -15,6 +15,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
STTMuteFrame,
|
||||
@@ -24,6 +25,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_service import AIService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
|
||||
@@ -283,3 +285,35 @@ class SegmentedSTTService(STTService):
|
||||
if not self._user_speaking and len(self._audio_buffer) > self._audio_buffer_size_1s:
|
||||
discarded = len(self._audio_buffer) - self._audio_buffer_size_1s
|
||||
self._audio_buffer = self._audio_buffer[discarded:]
|
||||
|
||||
|
||||
class WebsocketSTTService(STTService, WebsocketService):
|
||||
"""Base class for websocket-based STT services.
|
||||
|
||||
Combines STT functionality with websocket connectivity, providing automatic
|
||||
error handling and reconnection capabilities.
|
||||
|
||||
Event handlers:
|
||||
on_connection_error: Called when a websocket connection error occurs.
|
||||
|
||||
Example::
|
||||
|
||||
@stt.event_handler("on_connection_error")
|
||||
async def on_connection_error(stt: STTService, error: str):
|
||||
logger.error(f"STT connection error: {error}")
|
||||
"""
|
||||
|
||||
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
|
||||
"""Initialize the Websocket STT service.
|
||||
|
||||
Args:
|
||||
reconnect_on_error: Whether to automatically reconnect on websocket errors.
|
||||
**kwargs: Additional arguments passed to parent classes.
|
||||
"""
|
||||
STTService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
|
||||
self._register_event_handler("on_connection_error")
|
||||
|
||||
async def _report_error(self, error: ErrorFrame):
|
||||
await self._call_event_handler("on_connection_error", error.error)
|
||||
await self.push_error(error)
|
||||
|
||||
@@ -11,7 +11,6 @@ input processing, including VAD, turn analysis, and interruption management.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
@@ -79,10 +78,6 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Track user speaking state for interruption logic
|
||||
self._user_speaking = False
|
||||
|
||||
# We read audio from a single queue one at a time and we then run VAD in
|
||||
# a thread. Therefore, only one thread should be necessary.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
# Task to process incoming audio (VAD) and push audio frames downstream
|
||||
# if passthrough is enabled.
|
||||
self._audio_task = None
|
||||
@@ -398,9 +393,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
"""Analyze audio frame for voice activity."""
|
||||
state = VADState.QUIET
|
||||
if self.vad_analyzer:
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
|
||||
)
|
||||
state = await self.vad_analyzer.analyze_audio(audio_frame.audio)
|
||||
return state
|
||||
|
||||
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState) -> VADState:
|
||||
|
||||
@@ -29,20 +29,19 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputTransportMessageUrgentFrame,
|
||||
InterruptionFrame,
|
||||
MixerControlFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputDTMFFrame,
|
||||
OutputDTMFUrgentFrame,
|
||||
OutputImageRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
OutputTransportReadyFrame,
|
||||
SpeechOutputAudioRawFrame,
|
||||
SpriteFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
TTSAudioRawFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
@@ -178,7 +177,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Sending a frame indicating that the output transport is ready and able to receive frames.
|
||||
await self.push_frame(OutputTransportReadyFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message.
|
||||
|
||||
Args:
|
||||
@@ -307,9 +308,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
elif isinstance(frame, InterruptionFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, TransportMessageUrgentFrame) and not isinstance(
|
||||
frame, InputTransportMessageUrgentFrame
|
||||
):
|
||||
elif isinstance(frame, OutputTransportMessageUrgentFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, OutputDTMFUrgentFrame):
|
||||
await self.write_dtmf(frame)
|
||||
@@ -646,7 +645,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
elif isinstance(frame, OutputTransportMessageFrame):
|
||||
await self._transport.send_message(frame)
|
||||
elif isinstance(frame, OutputDTMFFrame):
|
||||
await self._transport.write_dtmf(frame)
|
||||
|
||||
@@ -30,15 +30,15 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputTransportMessageUrgentFrame,
|
||||
InputTransportMessageFrame,
|
||||
InterimTranscriptionFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputImageRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
SpriteFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
UserAudioRawFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
@@ -74,7 +74,7 @@ VAD_RESET_PERIOD_MS = 2000
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyTransportMessageFrame(TransportMessageFrame):
|
||||
class DailyOutputTransportMessageFrame(OutputTransportMessageFrame):
|
||||
"""Frame for transport messages in Daily calls.
|
||||
|
||||
Parameters:
|
||||
@@ -85,7 +85,7 @@ class DailyTransportMessageFrame(TransportMessageFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
class DailyOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
|
||||
"""Frame for urgent transport messages in Daily calls.
|
||||
|
||||
Parameters:
|
||||
@@ -96,7 +96,59 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
|
||||
class DailyTransportMessageFrame(DailyOutputTransportMessageFrame):
|
||||
"""Frame for transport messages in Daily calls.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `DailyOutputTransportMessageFrame`.
|
||||
|
||||
Parameters:
|
||||
participant_id: Optional ID of the participant this message is for/from.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"DailyTransportMessageFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use DailyOutputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyTransportMessageUrgentFrame(DailyOutputTransportMessageUrgentFrame):
|
||||
"""Frame for urgent transport messages in Daily calls.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `DailyOutputTransportMessageUrgentFrame`.
|
||||
|
||||
Parameters:
|
||||
participant_id: Optional ID of the participant this message is for/from.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"DailyTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use DailyOutputTransportMessageUrgentFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyInputTransportMessageFrame(InputTransportMessageFrame):
|
||||
"""Frame for input urgent transport messages in Daily calls.
|
||||
|
||||
Parameters:
|
||||
@@ -106,16 +158,61 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
|
||||
participant_id: Optional[str] = None
|
||||
|
||||
|
||||
class DailyInputTransportMessageUrgentFrame(DailyInputTransportMessageFrame):
|
||||
"""Frame for input urgent transport messages in Daily calls.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
Instead, use `DailyInputTransportMessageFrame`.
|
||||
|
||||
Parameters:
|
||||
participant_id: Optional ID of the participant this message is for/from.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"DailyInputTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use DailyInputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
|
||||
"""Frame to update remote participants in Daily calls.
|
||||
|
||||
.. deprecated:: 0.0.87
|
||||
`DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a future version.
|
||||
Create your own custom frame and use a custom processor to handle it or use, for example,
|
||||
`on_after_push_frame` event instead in the output transport.
|
||||
|
||||
Parameters:
|
||||
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
|
||||
"""
|
||||
|
||||
remote_participants: Mapping[str, Any] = None
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"DailyUpdateRemoteParticipantsFrame is deprecated and will be removed in a future version."
|
||||
"Instead, create your own custom frame and handle it in the "
|
||||
'`@transport.output().event_handler("on_after_push_frame")` event handler or a '
|
||||
"custom processor.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
class WebRTCVADAnalyzer(VADAnalyzer):
|
||||
"""Voice Activity Detection analyzer using WebRTC.
|
||||
@@ -454,7 +551,9 @@ class DailyTransportClient(EventHandler):
|
||||
"""
|
||||
return self._out_sample_rate
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send an application message to participants.
|
||||
|
||||
Args:
|
||||
@@ -464,7 +563,9 @@ class DailyTransportClient(EventHandler):
|
||||
return
|
||||
|
||||
participant_id = None
|
||||
if isinstance(frame, (DailyTransportMessageFrame, DailyTransportMessageUrgentFrame)):
|
||||
if isinstance(
|
||||
frame, (DailyOutputTransportMessageFrame, DailyOutputTransportMessageUrgentFrame)
|
||||
):
|
||||
participant_id = frame.participant_id
|
||||
|
||||
future = self._get_event_loop().create_future()
|
||||
@@ -1601,7 +1702,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
message: The message data to send.
|
||||
sender: ID of the message sender.
|
||||
"""
|
||||
frame = DailyInputTransportMessageUrgentFrame(message=message, participant_id=sender)
|
||||
frame = DailyInputTransportMessageFrame(message=message, participant_id=sender)
|
||||
await self.push_frame(frame)
|
||||
|
||||
#
|
||||
@@ -1823,7 +1924,9 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
|
||||
await self._client.update_remote_participants(frame.remote_participants)
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message to participants.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -29,9 +29,9 @@ from pipecat.frames.frames import (
|
||||
OutputAudioRawFrame,
|
||||
OutputDTMFFrame,
|
||||
OutputDTMFUrgentFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
UserAudioRawFrame,
|
||||
UserImageRawFrame,
|
||||
)
|
||||
@@ -68,7 +68,7 @@ DTMF_CODE_MAP = {
|
||||
|
||||
|
||||
@dataclass
|
||||
class LiveKitTransportMessageFrame(TransportMessageFrame):
|
||||
class LiveKitOutputTransportMessageFrame(OutputTransportMessageFrame):
|
||||
"""Frame for transport messages in LiveKit rooms.
|
||||
|
||||
Parameters:
|
||||
@@ -79,7 +79,7 @@ class LiveKitTransportMessageFrame(TransportMessageFrame):
|
||||
|
||||
|
||||
@dataclass
|
||||
class LiveKitTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
class LiveKitOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
|
||||
"""Frame for urgent transport messages in LiveKit rooms.
|
||||
|
||||
Parameters:
|
||||
@@ -89,6 +89,50 @@ class LiveKitTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
participant_id: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LiveKitTransportMessageFrame(LiveKitOutputTransportMessageFrame):
|
||||
"""Frame for transport messages in LiveKit rooms.
|
||||
|
||||
Parameters:
|
||||
participant_id: Optional ID of the participant this message is for/from.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"LiveKitTransportMessageFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use LiveKitOutputTransportMessageFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LiveKitTransportMessageUrgentFrame(LiveKitOutputTransportMessageUrgentFrame):
|
||||
"""Frame for urgent transport messages in LiveKit rooms.
|
||||
|
||||
Parameters:
|
||||
participant_id: Optional ID of the participant this message is for/from.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"LiveKitTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use LiveKitOutputTransportMessageUrgentFrame.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
class LiveKitParams(TransportParams):
|
||||
"""Configuration parameters for LiveKit transport.
|
||||
|
||||
@@ -310,10 +354,10 @@ class LiveKitTransportClient:
|
||||
logger.error(f"Error sending data: {e}")
|
||||
|
||||
async def send_dtmf(self, digit: str):
|
||||
"""Send DTMF tone to the room.
|
||||
r"""Send DTMF tone to the room.
|
||||
|
||||
Args:
|
||||
digit: The DTMF digit to send (0-9, *, #).
|
||||
digit: The DTMF digit to send (0-9, \*, #).
|
||||
"""
|
||||
if not self._connected:
|
||||
return
|
||||
@@ -677,7 +721,7 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
message: The message data to send.
|
||||
sender: ID of the message sender.
|
||||
"""
|
||||
frame = LiveKitTransportMessageUrgentFrame(message=message, participant_id=sender)
|
||||
frame = LiveKitOutputTransportMessageUrgentFrame(message=message, participant_id=sender)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _audio_in_task_handler(self):
|
||||
@@ -836,7 +880,9 @@ class LiveKitOutputTransport(BaseOutputTransport):
|
||||
await super().cleanup()
|
||||
await self._transport.cleanup()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message to participants.
|
||||
|
||||
Args:
|
||||
@@ -846,7 +892,9 @@ class LiveKitOutputTransport(BaseOutputTransport):
|
||||
if isinstance(message, dict):
|
||||
# fix message encoding for dict-like messages, e.g. RTVI messages.
|
||||
message = json.dumps(message, ensure_ascii=False)
|
||||
if isinstance(frame, (LiveKitTransportMessageFrame, LiveKitTransportMessageUrgentFrame)):
|
||||
if isinstance(
|
||||
frame, (LiveKitOutputTransportMessageFrame, LiveKitOutputTransportMessageUrgentFrame)
|
||||
):
|
||||
await self._client.send_data(message.encode(), frame.participant_id)
|
||||
else:
|
||||
await self._client.send_data(message.encode())
|
||||
@@ -1105,7 +1153,9 @@ class LiveKitTransport(BaseTransport):
|
||||
participant_id: Optional specific participant to send to.
|
||||
"""
|
||||
if self._output:
|
||||
frame = LiveKitTransportMessageFrame(message=message, participant_id=participant_id)
|
||||
frame = LiveKitOutputTransportMessageFrame(
|
||||
message=message, participant_id=participant_id
|
||||
)
|
||||
await self._output.send_message(frame)
|
||||
|
||||
async def send_message_urgent(self, message: str, participant_id: Optional[str] = None):
|
||||
@@ -1116,7 +1166,7 @@ class LiveKitTransport(BaseTransport):
|
||||
participant_id: Optional specific participant to send to.
|
||||
"""
|
||||
if self._output:
|
||||
frame = LiveKitTransportMessageUrgentFrame(
|
||||
frame = LiveKitOutputTransportMessageUrgentFrame(
|
||||
message=message, participant_id=participant_id
|
||||
)
|
||||
await self._output.send_message(frame)
|
||||
|
||||
@@ -283,7 +283,6 @@ class SmallWebRTCConnection(BaseObject):
|
||||
self._data_channel = None
|
||||
self._renegotiation_in_progress = False
|
||||
self._last_received_time = None
|
||||
self._message_queue = []
|
||||
self._pending_app_messages = []
|
||||
self._connecting_timeout_task = None
|
||||
|
||||
@@ -297,10 +296,7 @@ class SmallWebRTCConnection(BaseObject):
|
||||
# Flush queued messages once the data channel is open
|
||||
@channel.on("open")
|
||||
async def on_open():
|
||||
logger.debug("Data channel is open, flushing queued messages")
|
||||
while self._message_queue:
|
||||
message = self._message_queue.pop(0)
|
||||
self._data_channel.send(message)
|
||||
logger.debug("Data channel is open!")
|
||||
|
||||
@channel.on("message")
|
||||
async def on_message(message):
|
||||
@@ -503,7 +499,6 @@ class SmallWebRTCConnection(BaseObject):
|
||||
self._track_map.clear()
|
||||
if self._pc:
|
||||
await self._pc.close()
|
||||
self._message_queue.clear()
|
||||
self._pending_app_messages.clear()
|
||||
self._track_map = {}
|
||||
self._cancel_monitoring_connecting_state()
|
||||
@@ -669,8 +664,8 @@ class SmallWebRTCConnection(BaseObject):
|
||||
if self._data_channel and self._data_channel.readyState == "open":
|
||||
self._data_channel.send(json_message)
|
||||
else:
|
||||
logger.debug("Data channel not ready, queuing message")
|
||||
self._message_queue.append(json_message)
|
||||
# The client might choose never to create a data channel.
|
||||
logger.trace("Data channel not ready, discarding message!")
|
||||
|
||||
def ask_to_renegotiate(self):
|
||||
"""Request renegotiation of the WebRTC connection."""
|
||||
|
||||
@@ -116,6 +116,10 @@ class SmallWebRTCRequestHandler:
|
||||
detail="Cannot create new connection with existing connection active",
|
||||
)
|
||||
|
||||
def update_ice_servers(self, ice_servers: Optional[List[IceServer]] = None):
|
||||
"""Update the list of ICE servers used for WebRTC connections."""
|
||||
self._ice_servers = ice_servers
|
||||
|
||||
async def handle_web_request(
|
||||
self,
|
||||
request: SmallWebRTCRequest,
|
||||
@@ -180,7 +184,7 @@ class SmallWebRTCRequestHandler:
|
||||
|
||||
answer = pipecat_connection.get_answer()
|
||||
|
||||
if self._esp32_mode and self._host and self._host != "localhost":
|
||||
if self._esp32_mode:
|
||||
from pipecat.runner.utils import smallwebrtc_sdp_munging
|
||||
|
||||
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], self._host)
|
||||
|
||||
@@ -26,13 +26,13 @@ from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputTransportMessageUrgentFrame,
|
||||
InputTransportMessageFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputImageRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
SpriteFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
@@ -461,7 +461,9 @@ class SmallWebRTCClient:
|
||||
await self._webrtc_connection.disconnect()
|
||||
await self._handle_peer_disconnected()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send an application message through the WebRTC connection.
|
||||
|
||||
Args:
|
||||
@@ -683,7 +685,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
|
||||
message: The application message to process.
|
||||
"""
|
||||
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
|
||||
frame = InputTransportMessageUrgentFrame(message=message)
|
||||
frame = InputTransportMessageFrame(message=message)
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Add this method similar to DailyInputTransport.request_participant_image
|
||||
@@ -820,7 +822,9 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
|
||||
await super().cancel(frame)
|
||||
await self._client.disconnect()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message through the WebRTC connection.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -27,9 +27,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
@@ -345,7 +345,9 @@ class TavusTransportClient:
|
||||
participant_id, callback, audio_source, sample_rate, callback_interval_ms
|
||||
)
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a message to participants.
|
||||
|
||||
Args:
|
||||
@@ -373,7 +375,7 @@ class TavusTransportClient:
|
||||
|
||||
async def send_interrupt_message(self) -> None:
|
||||
"""Send an interrupt message to the conversation."""
|
||||
transport_frame = TransportMessageUrgentFrame(
|
||||
transport_frame = OutputTransportMessageUrgentFrame(
|
||||
message={
|
||||
"message_type": "conversation",
|
||||
"event_type": "conversation.interrupt",
|
||||
@@ -605,7 +607,9 @@ class TavusOutputTransport(BaseOutputTransport):
|
||||
await super().cancel(frame)
|
||||
await self._client.stop()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a message to participants.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -28,9 +28,9 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameProcessorSetup
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
@@ -385,7 +385,9 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
await super().cleanup()
|
||||
await self._transport.cleanup()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message through the WebSocket.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -28,9 +28,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
@@ -402,7 +402,9 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
await self._write_frame(frame)
|
||||
self._next_send_time = 0
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message frame.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -27,9 +27,9 @@ from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InterruptionFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
@@ -338,7 +338,9 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
await self._write_frame(frame)
|
||||
self._next_send_time = 0
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
async def send_message(
|
||||
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
|
||||
):
|
||||
"""Send a transport message frame to the client.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -241,6 +241,14 @@ class WhatsAppApi:
|
||||
self._whatsapp_url = f"{self.BASE_URL}{phone_number_id}/calls"
|
||||
self._whatsapp_token = whatsapp_token
|
||||
|
||||
def update_whatsapp_token(self, whatsapp_token: str):
|
||||
"""Update the WhatsApp access token for authentication."""
|
||||
self._whatsapp_token = whatsapp_token
|
||||
|
||||
def update_whatsapp_phone_number_id(self, phone_number_id: str):
|
||||
"""Update the WhatsApp phone number ID for authentication."""
|
||||
self._phone_number_id = phone_number_id
|
||||
|
||||
async def answer_call_to_whatsapp(self, call_id: str, action: str, sdp: str, from_: str):
|
||||
"""Answer an incoming WhatsApp call.
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@ WhatsApp call events.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
from typing import Awaitable, Callable, Dict, List, Optional
|
||||
|
||||
import aiohttp
|
||||
@@ -47,6 +49,7 @@ class WhatsAppClient:
|
||||
phone_number_id: str,
|
||||
session: aiohttp.ClientSession,
|
||||
ice_servers: Optional[List[IceServer]] = None,
|
||||
whatsapp_secret: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Initialize the WhatsApp client.
|
||||
|
||||
@@ -56,10 +59,12 @@ class WhatsAppClient:
|
||||
session: aiohttp session for making HTTP requests
|
||||
ice_servers: List of ICE servers for WebRTC connections. If None,
|
||||
defaults to Google's public STUN server
|
||||
whatsapp_secret: WhatsApp APP secret for validating that the webhook request came from WhatsApp.
|
||||
"""
|
||||
self._whatsapp_api = WhatsAppApi(
|
||||
whatsapp_token=whatsapp_token, phone_number_id=phone_number_id, session=session
|
||||
)
|
||||
self._whatsapp_secret = whatsapp_secret
|
||||
self._ongoing_calls_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
# Set default ICE servers if none provided
|
||||
@@ -68,6 +73,22 @@ class WhatsAppClient:
|
||||
else:
|
||||
self._ice_servers = ice_servers
|
||||
|
||||
def update_ice_servers(self, ice_servers: Optional[List[IceServer]] = None):
|
||||
"""Update the list of ICE servers used for WebRTC connections."""
|
||||
self._ice_servers = ice_servers
|
||||
|
||||
def update_whatsapp_secret(self, whatsapp_secret: Optional[str] = None):
|
||||
"""Update the WhatsApp APP secret for validating that the webhook request came from WhatsApp."""
|
||||
self._whatsapp_secret = whatsapp_secret
|
||||
|
||||
def update_whatsapp_token(self, whatsapp_token: str):
|
||||
"""Update the WhatsApp API access token."""
|
||||
self._whatsapp_api.update_whatsapp_token(whatsapp_token)
|
||||
|
||||
def update_whatsapp_phone_number_id(self, phone_number_id: str):
|
||||
"""Update the WhatsApp phone number ID for authentication."""
|
||||
self._whatsapp_api.update_whatsapp_phone_number_id(phone_number_id)
|
||||
|
||||
async def terminate_all_calls(self) -> None:
|
||||
"""Terminate all ongoing WhatsApp calls.
|
||||
|
||||
@@ -133,10 +154,32 @@ class WhatsAppClient:
|
||||
|
||||
return int(challenge)
|
||||
|
||||
async def _validate_whatsapp_webhook_request(self, raw_body: bytes, sha256_signature: str):
|
||||
"""Common handler for both /start and /connect endpoints."""
|
||||
# Compute HMAC SHA256 using your App Secret
|
||||
expected_signature = hmac.new(
|
||||
key=self._whatsapp_secret.encode("utf-8"),
|
||||
msg=raw_body,
|
||||
digestmod=hashlib.sha256,
|
||||
).hexdigest()
|
||||
|
||||
# Extract signature from header (strip 'sha256=' prefix)
|
||||
if not sha256_signature:
|
||||
raise Exception("Missing X-Hub-Signature-256 header")
|
||||
received_signature = sha256_signature.split("sha256=")[-1]
|
||||
|
||||
# Compare signatures securely
|
||||
if not hmac.compare_digest(expected_signature, received_signature):
|
||||
raise Exception("Invalid webhook signature")
|
||||
|
||||
logger.debug(f"Webhook signature verified!")
|
||||
|
||||
async def handle_webhook_request(
|
||||
self,
|
||||
request: WhatsAppWebhookRequest,
|
||||
connection_callback: Optional[Callable[[SmallWebRTCConnection], Awaitable[None]]] = None,
|
||||
raw_body: Optional[bytes] = None,
|
||||
sha256_signature: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Handle a webhook request from WhatsApp.
|
||||
|
||||
@@ -150,6 +193,8 @@ class WhatsAppClient:
|
||||
connection_callback: Optional callback function to invoke when a new
|
||||
WebRTC connection is established. The callback
|
||||
receives the SmallWebRTCConnection instance.
|
||||
raw_body: Optional bytes containing the raw request body.
|
||||
sha256_signature: Optional X-Hub-Signature-256 header value from the request.
|
||||
|
||||
Returns:
|
||||
bool: True if the webhook request was handled successfully, False otherwise
|
||||
@@ -159,6 +204,8 @@ class WhatsAppClient:
|
||||
Exception: If connection establishment or API calls fail
|
||||
"""
|
||||
try:
|
||||
if self._whatsapp_secret:
|
||||
await self._validate_whatsapp_webhook_request(raw_body, sha256_signature)
|
||||
for entry in request.entry:
|
||||
for change in entry.changes:
|
||||
# Handle connect events
|
||||
|
||||
@@ -29,7 +29,7 @@ class EventHandler:
|
||||
This data class stores the event name, a list of handlers to run for this
|
||||
event, and whether these handlers will be executed in a task.
|
||||
|
||||
Attributes:
|
||||
Parameters:
|
||||
name (str): The name of the event handler.
|
||||
handlers (List[Any]): A list of functions to be called when this event is triggered.
|
||||
is_sync (bool): Indicates whether the functions are executed in a task.
|
||||
|
||||
@@ -21,13 +21,24 @@ import re
|
||||
from typing import FrozenSet, Optional, Sequence, Tuple
|
||||
|
||||
import nltk
|
||||
from loguru import logger
|
||||
from nltk.tokenize import sent_tokenize
|
||||
|
||||
# Ensure punkt_tab tokenizer data is available
|
||||
try:
|
||||
nltk.data.find("tokenizers/punkt_tab")
|
||||
except LookupError:
|
||||
nltk.download("punkt_tab", quiet=True)
|
||||
try:
|
||||
nltk.download("punkt_tab", quiet=True)
|
||||
except (OSError, PermissionError) as e:
|
||||
logger.error(
|
||||
f"Failed to download NLTK 'punkt_tab' tokenizer data: {e}. "
|
||||
"This data is required for sentence tokenization features. "
|
||||
"The download failed due to filesystem permissions. "
|
||||
"To resolve: pre-install the data in a location with appropriate read permissions, "
|
||||
"or set the NLTK_DATA environment variable to point to a writable directory. "
|
||||
"See https://www.nltk.org/data.html for more information."
|
||||
)
|
||||
|
||||
SENTENCE_ENDING_PUNCTUATION: FrozenSet[str] = frozenset(
|
||||
{
|
||||
|
||||
@@ -12,14 +12,12 @@ from dotenv import load_dotenv
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.frames.frames import LLMContextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams, LLMService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.tests.utils import run_test
|
||||
|
||||
@@ -48,8 +46,13 @@ def standard_tools() -> ToolsSchema:
|
||||
|
||||
|
||||
async def _test_llm_function_calling(llm: LLMService):
|
||||
# Create an AsyncMock for the function
|
||||
mock_fetch_weather = AsyncMock()
|
||||
# Create a mock weather function
|
||||
call_count = 0
|
||||
|
||||
async def mock_fetch_weather(params: FunctionCallParams):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
pass
|
||||
|
||||
llm.register_function(None, mock_fetch_weather)
|
||||
|
||||
@@ -60,21 +63,19 @@ async def _test_llm_function_calling(llm: LLMService):
|
||||
},
|
||||
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
|
||||
]
|
||||
context = OpenAILLMContext(messages, standard_tools())
|
||||
# This is done by default inside the create_context_aggregator
|
||||
context.set_llm_adapter(llm.get_llm_adapter())
|
||||
context = LLMContext(messages, standard_tools())
|
||||
|
||||
pipeline = Pipeline([llm])
|
||||
|
||||
frames_to_send = [OpenAILLMContextFrame(context)]
|
||||
frames_to_send = [LLMContextFrame(context)]
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=None,
|
||||
)
|
||||
|
||||
# Assert that the mock function was called
|
||||
mock_fetch_weather.assert_called_once()
|
||||
# Assert that the weather function was called once
|
||||
assert call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")
|
||||
|
||||
@@ -11,8 +11,8 @@ from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
Frame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
TextFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.filters.identity_filter import IdentityFilter
|
||||
@@ -81,7 +81,7 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
await self.push_interruption_task_frame_and_wait()
|
||||
await self.push_frame(TransportMessageUrgentFrame(message=frame.text))
|
||||
await self.push_frame(OutputTransportMessageUrgentFrame(message=frame.text))
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -101,7 +101,7 @@ class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
expected_down_frames = [
|
||||
InterruptionFrame,
|
||||
InterruptionFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
EndFrame,
|
||||
]
|
||||
await run_test(
|
||||
|
||||
@@ -10,24 +10,21 @@ from langchain.prompts import ChatPromptTemplate
|
||||
from langchain_core.language_models import FakeStreamingListLLM
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
LLMContextAssistantTimestampFrame,
|
||||
LLMContextFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
from pipecat.tests.utils import SleepFrame, run_test
|
||||
@@ -67,13 +64,14 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
proc = LangchainProcessor(chain=chain)
|
||||
self.mock_proc = self.MockProcessor("token_collector")
|
||||
|
||||
context = OpenAILLMContext()
|
||||
tma_in = LLMUserContextAggregator(context)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
context = LLMContext()
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context, assistant_params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
)
|
||||
|
||||
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
|
||||
pipeline = Pipeline(
|
||||
[context_aggregator.user(), proc, self.mock_proc, context_aggregator.assistant()]
|
||||
)
|
||||
|
||||
frames_to_send = [
|
||||
UserStartedSpeakingFrame(),
|
||||
@@ -84,8 +82,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
expected_down_frames = [
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
OpenAILLMContextFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
LLMContextFrame,
|
||||
LLMContextAssistantTimestampFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
@@ -94,4 +92,6 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
)
|
||||
|
||||
self.assertEqual("".join(self.mock_proc.token), self.expected_response)
|
||||
self.assertEqual(tma_out.messages[-1]["content"], self.expected_response)
|
||||
self.assertEqual(
|
||||
context_aggregator.assistant().messages[-1]["content"], self.expected_response
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user