Compare commits

..

8 Commits

Author SHA1 Message Date
mattie ruth backman
7742d1a83b Add error handling for unsupported files 2026-03-18 15:49:48 -04:00
mattie ruth backman
d9cebe602f Add new FileSourceType for 'id' and use that for local uploads, prefixed with 'pipecat:' 2026-03-18 15:49:48 -04:00
mattie ruth backman
96e06d2401 Update /files/ upload response to match RTVI format, rather than inventing a new one 2026-03-18 15:49:48 -04:00
mattie ruth backman
267c86e596 support RTVI files uploads larger than the transport can handle
This PR introduces:
1. a new /files/ POST endpoint in the local runner that supports
   uploading a file to a folder that must be provided at runtime
2. By default, the runner will allow a maximum 10 files to be
   saved
3. Added logic to the send-file handler in RTVI to read a file
   from disk if the file provide is a url starting with '/files/'
2026-03-18 15:49:48 -04:00
mattie ruth backman
9fb06c3e4b Update File upload RTVI messages and frames to use mime-type as the format 2026-03-18 15:49:48 -04:00
mattie ruth backman
71197fbc2c Support files provided via url 2026-03-18 15:49:48 -04:00
mattie ruth backman
9cd4e5faca Support generic files (openai so far) 2026-03-18 15:49:48 -04:00
mattie ruth backman
4f290be834 Initial commit: Introducing RTVI support for files
This commit introduces the types for all RTVI file messaging and full
support for sending images as byte strings
2026-03-18 15:49:48 -04:00
941 changed files with 35956 additions and 62000 deletions

View File

@@ -1 +0,0 @@
../../.claude/skills/changelog

View File

@@ -1 +0,0 @@
../../.claude/skills/cleanup

View File

@@ -1 +0,0 @@
../../.claude/skills/code-review

View File

@@ -1 +0,0 @@
../../.claude/skills/docstring

View File

@@ -1 +0,0 @@
../../.claude/skills/pr-description

View File

@@ -1 +0,0 @@
../../.claude/skills/pr-submit

View File

@@ -1 +0,0 @@
../../.claude/skills/update-docs

View File

@@ -1,8 +1,3 @@
---
name: cleanup
description: Review, refactor, document, and validate code changes in the current branch
---
# Code Cleanup Skill
The **Code Cleanup Skill** reviews, refactors, and documents code changes in your current branch, ensuring alignment with **Pipecat's architecture, coding standards, and example patterns**.
@@ -149,7 +144,7 @@ class InputParams(BaseModel):
#### Examples
Validated against `examples/07-interruptible.py`:
Validated against `examples/foundational/07-interruptible.py`:
- Proper `create_transport()` usage
- Correct pipeline structure

View File

@@ -1,91 +0,0 @@
---
name: squash-commits
description: Reorganize messy branch commits into a small set of logical, meaningful commits without changing any content. Drops merge-from-main commits. Safe: creates a backup branch first.
---
Reorganize the commits on the current branch into a small number of logical commits. Do NOT change any file content — only the commit structure changes.
## Instructions
### 1. Safety check
```bash
git status --short
```
If there are uncommitted changes, stop and tell the user to commit or stash them first.
### 2. Inspect the branch
```bash
git log main..HEAD --oneline
git diff main..HEAD --name-only
```
List every file changed vs `main` and every commit on the branch (excluding merge commits from main).
### 3. Create a backup branch
```bash
git branch backup/<current-branch-name>
```
Tell the user the backup exists so they can recover if needed.
### 4. Soft-reset to main and unstage everything
```bash
git reset --soft main
git restore --staged .
```
All branch changes are now in the working tree, unstaged. No content has changed.
### 5. Plan the logical groups
Read the changed files and the original commit messages to understand what the work covers. Group related files into logical commits. Typical groups:
- Core feature or fix (new source files + modified core files)
- Secondary features or fixes (each as its own commit if distinct)
- Refactoring or renames
- Tests
- Changelogs / docs
Use the changelog files (if any) as a strong hint — each changelog entry often maps to one commit.
Present the proposed grouping to the user and ask for confirmation before committing.
### 6. Commit in logical groups
For each group, stage only the relevant files and commit with a clear message following the project's conventions:
```bash
git add <file1> <file2> ...
git commit -m "..."
```
Use conventional commit prefixes if the project uses them (`feat:`, `fix:`, `refactor:`, `test:`, `chore:`).
### 7. Verify
```bash
git log main..HEAD --oneline
git diff main..HEAD --name-only
git status --short
```
Confirm:
- Commit count is small and each message is meaningful
- The set of changed files vs `main` is identical to before
- Working tree is clean
### 8. Remind about force-push
The branch history has been rewritten. Tell the user they will need to `git push --force-with-lease` when they are ready to update the remote. Do NOT push automatically.
## Rules
- Never change file contents. If you find yourself editing a file, stop.
- Never skip the backup branch step.
- Never force-push without explicit user instruction.
- If any step fails or the result looks wrong, tell the user and suggest restoring from the backup: `git reset --hard backup/<branch-name>`.

30
.dockerignore Normal file
View File

@@ -0,0 +1,30 @@
# flyctl launch added from .gitignore
**/.vscode
**/env
**/__pycache__
**/*~
**/venv
#*#
# Distribution / packaging
**/.Python
**/build
**/develop-eggs
**/dist
**/downloads
**/eggs
**/.eggs
**/lib
**/lib64
**/parts
**/sdist
**/var
**/wheels
**/share/python-wheels
**/*.egg-info
**/.installed.cfg
**/*.egg
**/MANIFEST
**/.DS_Store
**/.env
fly.toml

View File

@@ -42,7 +42,6 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

View File

@@ -32,9 +32,7 @@ jobs:
run: uv python install 3.12
- name: Install development dependencies
# `--all-extras` (matching the dev setup in README.md) so pyright can
# resolve types from various optional dependencies.
run: uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
run: uv sync --group dev
- name: Ruff formatter
id: ruff-format
@@ -43,7 +41,3 @@ jobs:
- name: Ruff linter (all rules)
id: ruff-check
run: uv run ruff check
- name: Type check (pyright)
id: pyright
run: uv run pyright

View File

@@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.11.15', '3.12.13', '3.13.12', '3.14.3']
python-version: ['3.10.19', '3.11.14', '3.12.12', '3.13.12']
name: Python ${{ matrix.python-version }}
steps:
@@ -42,7 +42,7 @@ jobs:
- name: Test uv sync with all extras
run: |
uv sync --group dev --all-extras
uv sync --group dev --all-extras --no-extra krisp
- name: Verify installation
run: |

51
.github/workflows/sync-quickstart.yaml vendored Normal file
View File

@@ -0,0 +1,51 @@
name: Sync Quickstart to pipecat-quickstart repo
on:
push:
branches: [main]
paths:
- 'examples/quickstart/**'
workflow_dispatch: # Manual trigger
jobs:
sync-quickstart:
runs-on: ubuntu-latest
steps:
- name: Checkout main repo
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Checkout quickstart repo
uses: actions/checkout@v4
with:
repository: pipecat-ai/pipecat-quickstart
token: ${{ secrets.QUICKSTART_SYNC_TOKEN }}
path: quickstart-repo
- name: Sync files (excluding uv.lock and README.md)
run: |
# Copy all files except uv.lock and README.md
find examples/quickstart -type f \
-not -name "README.md" \
-not -name "uv.lock" \
-exec cp {} quickstart-repo/ \;
- name: Commit and push changes
run: |
cd quickstart-repo
git config user.name "GitHub Action"
git config user.email "action@github.com"
git add .
# Only commit if there are changes
if ! git diff --staged --quiet; then
git commit -m "Sync from pipecat main repo
Updated files from examples/quickstart/
Commit: ${{ github.sha }}
"
git push
else
echo "No changes to sync"
fi

View File

@@ -46,7 +46,6 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

View File

@@ -114,7 +114,6 @@ jobs:
GH_TOKEN=$DOCS_SYNC_TOKEN gh pr create \
--repo pipecat-ai/docs \
--label auto-docs \
--label pipecat \
--title "docs: update for pipecat PR #${{ steps.pr.outputs.number }}" \
--body "$(cat <<'BODY'
Automated documentation update for [pipecat PR #${{ steps.pr.outputs.number }}](https://github.com/pipecat-ai/pipecat/pull/${{ steps.pr.outputs.number }}).

View File

@@ -1,13 +1,8 @@
repos:
- repo: local
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.1
hooks:
- id: ruff
name: ruff
entry: uv run ruff check --fix
language: system
types: [python]
language_version: python3
args: [--fix]
- id: ruff-format
name: ruff-format
entry: uv run ruff format
language: system
types: [python]

View File

@@ -11,7 +11,7 @@ build:
jobs:
post_install:
- pip install uv
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
sphinx:
configuration: docs/api/conf.py

174
AGENTS.md
View File

@@ -1,174 +0,0 @@
# AGENTS.md
This file provides guidance to AI coding agents when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Deprecations**: Use the `.. deprecated:: <version>` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
# Pydantic params class with a deprecated field
class MyParams(BaseModel):
"""Configuration parameters for MyService.
Parameters:
new_setting: Replacement for ``old_setting``.
old_setting: Legacy setting, no longer used.
.. deprecated:: 1.2.0
Use ``new_setting`` instead. Will be removed in 2.0.0.
"""
new_setting: str = "default"
old_setting: str | None = None
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

File diff suppressed because it is too large Load Diff

62
CHANGELOG.md.template Normal file
View File

@@ -0,0 +1,62 @@
# Changelog
All notable changes to the **&lt;project name&gt;** SDK will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
Please make sure to add your changes to the appropriate categories:
## [Unreleased]
### Added
<!-- for new functionality -->
- n/a
### Changed
<!-- for changed functionality -->
- n/a
### Deprecated
<!-- for soon-to-be removed functionality -->
- n/a
### Removed
<!-- for removed functionality -->
- n/a
### Fixed
<!-- for fixed bugs -->
- n/a
### Performance
<!-- for performance-relevant changes -->
- n/a
### Security
<!-- for security-relevant changes -->
- n/a
### Other
<!-- for everything else -->
- n/a
## [0.1.0] - YYYY-MM-DD
Initial release.

158
CLAUDE.md
View File

@@ -1 +1,157 @@
@AGENTS.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra krisp
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

View File

@@ -23,7 +23,7 @@ Create your integration following the patterns and examples shown in the "Integr
Your repository must contain these components:
- **Source code** - Complete implementation following Pipecat patterns
- **Foundational example** - Single file example showing basic usage (see [Pipecat examples](https://github.com/pipecat-ai/pipecat/tree/main/examples))
- **Foundational example** - Single file example showing basic usage (see [Pipecat examples](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational))
- **README.md** - Must include:
- Introduction and explanation of your integration
- Installation instructions
@@ -65,25 +65,12 @@ Once your PR is submitted, post in the `#community-integrations` Discord channel
#### Websocket-based Services
**Base class:** `WebsocketSTTService`
**Use for:** Services where you manage the websocket connection directly. Combines `STTService` with `WebsocketService` for automatic reconnection and keepalive support.
**Examples:**
- [CartesiaSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/cartesia/stt.py)
- [ElevenLabsRealtimeSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/stt.py)
#### SDK-based Streaming Services
**Base class:** `STTService`
**Use for:** Streaming services where the provider's Python SDK manages the connection internally.
**Examples:**
- [DeepgramSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py)
- [GoogleSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/stt.py)
- [SpeechmaticsSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/speechmatics/stt.py)
#### File-based Services
@@ -121,59 +108,55 @@ Once your PR is submitted, post in the `#community-integrations` Discord channel
#### Key requirements:
- **`_process_context(self, context: LLMContext)`** — The main method that processes an LLM context and generates a response. Each LLM service overrides `process_frame` to extract context from `LLMContextFrame` and calls `_process_context`.
- **`adapter_class`** — Class attribute pointing to a `BaseLLMAdapter` subclass. Defaults to `OpenAILLMAdapter`. Non-OpenAI services must implement their own adapter (see `src/pipecat/adapters/base_llm_adapter.py`) with methods:
- `get_llm_invocation_params(context)` — Extract provider-specific params from universal context
- `to_provider_tools_format(tools_schema)` — Convert standard tools to provider format
- `get_messages_for_logging(context)` — Format messages for logging
- Reference adapters: `src/pipecat/adapters/services/` (anthropic, gemini, bedrock, etc.)
- **Frame sequence:** Output must follow this frame sequence pattern:
- `LLMFullResponseStartFrame` Signals the start of an LLM response
- `LLMTextFrame` Contains LLM content, typically streamed as tokens
- `LLMFullResponseEndFrame` Signals the end of an LLM response
- `LLMFullResponseStartFrame` - Signals the start of an LLM response
- `LLMTextFrame` - Contains LLM content, typically streamed as tokens
- `LLMFullResponseEndFrame` - Signals the end of an LLM response
- **Thought frames (reasoning models):** If the model supports extended thinking / chain-of-thought, emit thought frames alongside the response:
- `LLMThoughtStartFrame` — Signals the start of a thought
- `LLMThoughtTextFrame` — Contains thought content, streamed as tokens
- `LLMThoughtEndFrame` — Signals the end of a thought
- **Context aggregation** is handled by the framework via `LLMContext` + `LLMContextAggregatorPair`. The LLM service just processes context it receives — no need to implement aggregators.
- **Context aggregation:** Implement context aggregation to collect user and assistant content:
- Aggregators come in pairs with a `user()` instance and `assistant()` instance
- Context must adhere to the `LLMContext` universal format
- Aggregators should handle adding messages, function calls, and images to the context
### TTS (Text-to-Speech) Services
#### WebsocketTTSService
#### AudioContextWordTTSService
**Use for:** Websocket-based streaming services (with or without word timestamps)
**Use for:** Websocket-based services supporting word/timestamp alignment
**Examples:**
**Example:**
- [CartesiaTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/cartesia/tts.py)
- [ElevenLabsTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/tts.py)
#### InterruptibleTTSService
**Use for:** Websocket-based services without word timestamps that reconnect on interruption (e.g. don't support a context ID or interruption message)
**Use for:** Websocket-based services without word/timestamp alignment, requiring disconnection on interruption
**Example:**
- [SarvamTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/sarvam/tts.py)
#### WordTTSService
**Use for:** HTTP-based services supporting word/timestamp alignment
**Example:**
- [ElevenLabsHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/tts.py)
#### TTSService
**Use for:** HTTP-based services (word timestamps are supported in the base class)
**Use for:** HTTP-based services without word/timestamp alignment
**Examples:**
**Example:**
- [GoogleHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/tts.py)
- [OpenAITTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/openai/tts.py)
#### Key requirements:
- For websocket services, use asyncio WebSocket implementation
- For websocket services, use asyncio WebSocket implementation (required for v13+ support)
- Handle idle service timeouts with keepalives
- TTS services push both audio (`TTSAudioRawFrame`) and text (`TTSTextFrame`) frames
- TTSServices push both audio (`TTSRawAudioFrame`) and text (`TTSTextFrame`) frames
### Telephony Serializers
@@ -217,25 +200,14 @@ Vision services process images and provide analysis such as descriptions, object
#### Key requirements:
- Must implement `run_vision` method that takes a `UserImageRawFrame` and returns an `AsyncGenerator[Frame, None]`
- The method processes the image frame and yields frames with analysis results
- Must yield the frame sequence: `VisionFullResponseStartFrame`, `VisionTextFrame`, `VisionFullResponseEndFrame`
- Must implement `run_vision` method that takes an `LLMContext` and returns an `AsyncGenerator[Frame, None]`
- The method processes the latest image in the context and yields frames with analysis results
- Typically yields `TextFrame` objects containing descriptions or answers
## Implementation Guidelines
### Naming Conventions
#### Package and Repository Naming
Use the `pipecat-{vendor}` naming convention for your PyPI package and repository:
- `pipecat-{vendor}` — for single-service integrations (e.g., `pipecat-deepdub`)
- `pipecat-{vendor}-{type}` — when a vendor offers multiple service types (e.g., `pipecat-upliftai-stt`, `pipecat-upliftai-tts`)
This convention makes community packages easily discoverable via PyPI search and clearly identifies them as part of the Pipecat ecosystem.
#### Class Naming
- **STT:** `VendorSTTService`
- **LLM:** `VendorLLMService`
- **TTS:**
@@ -409,7 +381,7 @@ Note that `self.sample_rate` is a `@property` set in the TTSService base class,
Use Pipecat's tracing decorators:
- **STT:** `@traced_stt` - decorate `_handle_transcription(self, transcript, is_final, language)` (the standard method name convention)
- **STT:** `@traced_stt` - decorate a function that handles `transcript`, `is_final`, `language` as args
- **LLM:** `@traced_llm` - decorate the `_process_context()` method
- **TTS:** `@traced_tts` - decorate the `run_tts()` method
@@ -417,9 +389,8 @@ Use Pipecat's tracing decorators:
### Packaging and Distribution
- Name your package `pipecat-{vendor}` (see [Naming Conventions](#naming-conventions))
- Use [uv](https://docs.astral.sh/uv/) for packaging (encouraged)
- Publish to PyPI for easier installation
- Consider releasing to PyPI for easier installation
- Follow semantic versioning principles
- Maintain a changelog
@@ -432,15 +403,17 @@ For REST-based communication, use aiohttp. Pipecat includes this as a required d
- Wrap API calls in appropriate try/catch blocks
- Handle rate limits and network failures gracefully
- Provide meaningful error messages
- When errors occur, raise exceptions AND push errors to notify the pipeline:
- When errors occur, raise exceptions AND push `ErrorFrame`s to notify the pipeline:
```python
from pipecat.frames.frames import ErrorFrame
try:
# Your API call
result = await self._make_api_call()
except Exception as e:
# Push error upstream to notify the pipeline
await self.push_error(f"{self} error: {e}", exception=e)
# Push error frame to pipeline
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Raise or handle as appropriate
raise
```

View File

@@ -8,7 +8,7 @@
**Pipecat** is an open-source Python framework for building real-time voice and multimodal conversational agents. Orchestrate audio and video, AI services, different transports, and conversation pipelines effortlessly—so you can focus on what makes your agent unique.
> Want to dive right in? Run `pipecat init quickstart` or follow the [quickstart guide](https://docs.pipecat.ai/getting-started/quickstart).
> Want to dive right in? Try the [quickstart](https://docs.pipecat.ai/getting-started/quickstart).
## 🚀 What You Can Build
@@ -28,10 +28,6 @@
## 🌐 Pipecat Ecosystem
### 🧩 Multi-agent systems
Need multiple AI agents working together? [Pipecat Subagents](https://github.com/pipecat-ai/pipecat-subagents) lets you build distributed multi-agent systems where each agent runs its own pipeline and communicates through a shared message bus. Hand off conversations between specialists, dispatch background tasks, and scale agents across processes or machines.
### 📱 Client SDKs
Building client applications? You can connect to Pipecat from any platform using our official SDKs:
@@ -69,10 +65,6 @@ claude plugin marketplace add pipecat-ai/skills
and install any of the available plugins.
### 🧩 Community Integrations
Build and share your own Pipecat service integrations! Browse existing [community integrations](https://docs.pipecat.ai/api-reference/server/services/community-integrations) or check out our [guide](COMMUNITY_INTEGRATIONS.md) to create your own.
### 📺️ Pipecat TV Channel
Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.youtube.com/playlist?list=PLzU2zoMTQIHjqC3v4q2XVSR3hGSzwKFwH) channel.
@@ -83,28 +75,27 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/simple-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/daily-multi-translation"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/daily-multi-translation/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/vision/vision-moondream.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/assets/moondream.png" width="400" /></a>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12-describe-video.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/assets/moondream.png" width="400" /></a>
</p>
## 🧩 Available services
| Category | Services |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/api-reference/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/api-reference/server/services/stt/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/api-reference/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/api-reference/server/services/stt/gladia), [Google](https://docs.pipecat.ai/api-reference/server/services/stt/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/stt/mistral), [NVIDIA](https://docs.pipecat.ai/api-reference/server/services/stt/nvidia), [OpenAI (Whisper)](https://docs.pipecat.ai/api-reference/server/services/stt/openai), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/api-reference/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/api-reference/server/services/stt/whisper), [xAI](https://docs.pipecat.ai/api-reference/server/services/stt/xai) |
| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Inception](https://docs.pipecat.ai/api-reference/server/services/llm/inception), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/api-reference/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/api-reference/server/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/api-reference/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/api-reference/server/services/tts/fish), [Google](https://docs.pipecat.ai/api-reference/server/services/tts/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/api-reference/server/services/tts/groq), [Hume](https://docs.pipecat.ai/api-reference/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/api-reference/server/services/tts/inworld), [Kokoro](https://docs.pipecat.ai/api-reference/server/services/tts/kokoro), [LMNT](https://docs.pipecat.ai/api-reference/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/api-reference/server/services/tts/minimax), [Mistral](https://docs.pipecat.ai/api-reference/server/services/tts/mistral), [Neuphonic](https://docs.pipecat.ai/api-reference/server/services/tts/neuphonic), [NVIDIA](https://docs.pipecat.ai/api-reference/server/services/tts/nvidia), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/tts/openai), [Piper](https://docs.pipecat.ai/api-reference/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/api-reference/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/api-reference/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/api-reference/server/services/tts/smallest), [Soniox](https://docs.pipecat.ai/api-reference/server/services/tts/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/tts/speechmatics), [xAI](https://docs.pipecat.ai/api-reference/server/services/tts/xai), [XTTS](https://docs.pipecat.ai/api-reference/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/api-reference/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/api-reference/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/api-reference/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/api-reference/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/api-reference/server/services/s2s/ultravox), |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [Vonage (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/vonage), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local |
| Serializers | [Exotel](https://docs.pipecat.ai/api-reference/server/services/serializers/exotel), [Genesys](https://docs.pipecat.ai/api-reference/server/services/serializers/genesys), [Plivo](https://docs.pipecat.ai/api-reference/server/services/serializers/plivo), [Twilio](https://docs.pipecat.ai/api-reference/server/services/serializers/twilio), [Telnyx](https://docs.pipecat.ai/api-reference/server/services/serializers/telnyx), [Vonage](https://docs.pipecat.ai/api-reference/server/services/serializers/vonage) |
| Video | [HeyGen](https://docs.pipecat.ai/api-reference/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/api-reference/server/services/transport/lemonslice), [Tavus](https://docs.pipecat.ai/api-reference/server/services/video/tavus), [Simli](https://docs.pipecat.ai/api-reference/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/api-reference/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/api-reference/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/api-reference/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/api-reference/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/api-reference/server/utilities/audio/silero-vad-analyzer), [Krisp Viva](https://docs.pipecat.ai/guides/features/krisp-viva), [Koala](https://docs.pipecat.ai/api-reference/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/api-reference/server/utilities/audio/aic-filter), [RNNoise](https://docs.pipecat.ai/api-reference/server/utilities/audio/rnnoise-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/api-reference/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/api-reference/server/services/analytics/sentry) |
| Community | [Browse community integrations →](https://docs.pipecat.ai/api-reference/server/services/community-integrations) |
| Category | Services |
| ------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Gradium](https://docs.pipecat.ai/server/services/stt/gradium), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Gradium](https://docs.pipecat.ai/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/server/services/s2s/ultravox), |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Exotel](https://docs.pipecat.ai/server/utilities/serializers/exotel), [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx), [Vonage](https://docs.pipecat.ai/server/utilities/serializers/vonage) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/server/services/video/lemonslice), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/google-imagen), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
📚 [View full services documentation →](https://docs.pipecat.ai/api-reference/server/services/supported-services)
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
## ⚡ Getting started
@@ -146,15 +137,15 @@ You can get started with Pipecat running on your local machine, then move your a
## 🧪 Code examples
- [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples) — small snippets that build on each other, introducing one or two concepts at a time
- [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) — small snippets that build on each other, introducing one or two concepts at a time
- [Example apps](https://github.com/pipecat-ai/pipecat-examples) — complete applications that you can use as starting points for development
## 🛠️ Contributing to the framework
### Prerequisites
**Minimum Python Version:** 3.11
**Recommended Python Version:** >= 3.12
**Minimum Python Version:** 3.10
**Recommended Python Version:** 3.12
### Setup Steps
@@ -170,6 +161,7 @@ You can get started with Pipecat running on your local machine, then move your a
```bash
uv sync --group dev --all-extras \
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
```

View File

@@ -0,0 +1 @@
- Changed tool result JSON serialization to use `ensure_ascii=False`, preserving UTF-8 characters instead of escaping them. This reduces context size and token usage for non-English languages.

View File

@@ -0,0 +1 @@
- `OpenAIRealtimeSTTService`'s `noise_reduction` parameter is now part of `OpenAIRealtimeSTTSettings`, making it runtime-updatable via `STTUpdateSettingsFrame`. The direct `noise_reduction` init argument is deprecated as of 0.0.106.

View File

@@ -0,0 +1 @@
- Updated `sarvamai` dependency from `0.1.26a2` (alpha) to `0.1.26` (stable release).

1
changelog/4000.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where the default model for `OpenAILLMService` and `AzureLLMService` was mistakenly reverted to `gpt-4o`. The defaults are now restored to `gpt-4.1`.

View File

@@ -0,0 +1 @@
- `SimliVideoService` now extends `AIService` instead of `FrameProcessor`, aligning it with the HeyGen and Tavus video services. It supports `SimliVideoService.Settings(...)` for configuration and uses `start()`/`stop()`/`cancel()` lifecycle methods. Existing constructor usage (`api_key`, `face_id`, etc.) remains unchanged.

View File

@@ -0,0 +1 @@
- `SimliVideoService.InputParams` is deprecated. Use the direct constructor parameters `max_session_length`, `max_idle_time`, and `enable_logging` instead.

1
changelog/4004.added.md Normal file
View File

@@ -0,0 +1 @@
- Added optional `service` field to `ServiceUpdateSettingsFrame` (and its subclasses `LLMUpdateSettingsFrame`, `TTSUpdateSettingsFrame`, `STTUpdateSettingsFrame`) to target a specific service instance. When `service` is set, only the matching service applies the settings; others forward the frame unchanged. This enables updating a single service when multiple services of the same type exist in the pipeline.

1
changelog/4005.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `sip_provider` and `room_geo` parameters to `configure()` in the Daily runner. These convenience parameters let callers specify a SIP provider name and geographic region directly without manually constructing `DailyRoomProperties` and `DailyRoomSipParams`.

1
changelog/4006.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a race condition where `EndTaskFrame` could cause the pipeline to shut down before in-flight frames (e.g. LLM function call responses) finished processing. `EndTaskFrame` and `StopTaskFrame` now flow through the pipeline as `ControlFrame`s, ensuring all pending work is flushed before shutdown begins. `CancelTaskFrame` and `InterruptionTaskFrame` remain immediate (`SystemFrame`).

View File

@@ -0,0 +1 @@
- Fixed `TTSService` potentially canceling in-flight audio during shutdown. The stop sequence now waits for all queued audio contexts to finish processing before canceling the stop frame task.

1
changelog/4007.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ParallelPipeline` dropping or misordering frames during lifecycle synchronization. Buffered frames are now flushed in the correct order relative to synchronization frames (`StartFrame` goes first, `EndFrame`/`CancelFrame` go after), and frames added to the buffer during flush are also drained.

1
changelog/4009.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `PerplexityLLMAdapter` that automatically transforms conversation messages to satisfy Perplexity's stricter API constraints (strict role alternation, no non-initial system messages, last message must be user/tool). Previously, certain conversation histories could cause Perplexity API errors that didn't occur with OpenAI (`PerplexityLLMService` subclasses `OpenAILLMService` since Perplexity uses an OpenAI-compatible API).

View File

@@ -0,0 +1 @@
- Deprecated `LocalSmartTurnAnalyzerV2` and `LocalCoreMLSmartTurnAnalyzer`. Use `LocalSmartTurnAnalyzerV3` instead. Instantiating these analyzers will now emit a `DeprecationWarning`.

View File

@@ -0,0 +1 @@
- Update `pipecat-ai-small-webrtc-prebuilt` to `2.4.0`.

1
changelog/4024.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `Language` enum values (e.g. `Language.ES`) not being converted to service-specific codes when passed via `settings=Service.Settings(language=Language.ES)` at init time. This caused API errors (e.g. 400 from Rime) because the raw enum was sent instead of the expected language code (e.g. `"spa"`). Runtime updates via `UpdateSettingsFrame` were unaffected. The fix centralizes conversion in the base `TTSService` and `STTService` classes so all services handle this consistently.

1
changelog/4026.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `DeepgramSTTService` ignoring the `base_url` scheme when using `ws://` or `http://`. Previously these were silently overwritten with `wss://` / `https://`, breaking air-gapped or private deployments that don't use TLS. All scheme choices (`wss://`, `https://`, `ws://`, `http://`, or bare hostname) are now respected.

View File

@@ -0,0 +1 @@
- Bumped PyJWT minimum version from 2.10.1 to 2.12.0 in the `livekit` extra to address CVE-2026-32597 (GHSA-752w-5fwx-jx9f), where PyJWT <= 2.11.0 accepted unknown `crit` header extensions.

1
changelog/4037.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `LLMSwitcher.register_function()` and `register_direct_function()` not accepting or forwarding the `timeout_secs` parameter.

1
changelog/4046.fixed.md Normal file
View File

@@ -0,0 +1 @@
Fixed `SonioxSTTService` and `OpenAIRealtimeSTTService` crash when language parameters contain plain strings instead of `Language` enum values.

1
changelog/4047.added.md Normal file
View File

@@ -0,0 +1 @@
- Added DTMF input event support to the Daily transport. Incoming DTMF tones are now received via Daily's `on_dtmf_event` callback and pushed into the pipeline as `InputDTMFFrame`, enabling bots to react to keypad presses from phone callers.

View File

@@ -0,0 +1 @@
- Updated `daily-python` dependency to 0.25.0.

View File

@@ -0,0 +1 @@
- Added `enable_dialout` parameter to `configure()` in `pipecat.runner.daily` to support dial-out rooms. Also narrowed misleading `Optional` type hints and deduplicated token expiry calculation.

View File

@@ -1 +0,0 @@
- Added `VonageVideoConnectorTransport`, a new transport integration for real-time Vonage WebRTC sessions using the Vonage Video Connector library.

1
changelog/4057.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed premature user turn stops caused by late transcriptions arriving between turns. A stale transcript from the previous turn could persist into the next turn and trigger a stop before the current turn's real transcript arrived. Stop strategies are now reset at both turn start and turn stop to prevent state from leaking across turn boundaries.

1
changelog/4058.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed raw language strings like `"de-DE"` silently failing when passed to TTS/STT services (e.g. ElevenLabs producing no audio). Raw strings now go through the same `Language` enum resolution as enum values, so regional codes like `"de-DE"` are properly converted to service-expected formats like `"de"`. Unrecognized strings log a warning instead of failing silently.

1
changelog/4063.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed Deepgram STT list-type settings (`keyterm`, `keywords`, `search`, `redact`, `replace`) being stringified instead of passed as lists to the SDK, which caused them to be sent as literal strings (e.g. `"['pipecat']"`) in the WebSocket query params.

View File

@@ -1 +0,0 @@
- Fixed Azure TTS last word being missed by observers and RTVI UI. The completion signal was racing with word timestamp processing, causing the final word's `TTSTextFrame` to arrive after `TTSStoppedFrame`. Completion is now routed through the word boundary queue to ensure all words are processed before signaling stream end.

View File

@@ -1 +0,0 @@
- Fixed `BaseOutputTransport` reordering frames that share the same presentation timestamp. Frames with equal PTS values are now emitted in insertion order, preventing subtle audio/text sequencing bugs when multiple frames arrive at the same time.

View File

@@ -1 +0,0 @@
- Fixed Cartesia word timestamps leaking SSML tag text (e.g. `<spell>`, `<emotion>`, `<break>`) into word entries. Tags are now stripped before processing, so word-to-text attribution remains accurate when SSML markup is present in the TTS input.

View File

@@ -1 +0,0 @@
- Fixed `TTSTextFrame` entries losing their original text structure when word timestamps are enabled. Each `TTSTextFrame` now carries a `raw_text` field containing the corresponding span of the original LLM-produced text (including pattern delimiters such as `<card>4111 1111 1111 1111</card>`), so the assistant context receives properly-tagged content rather than the cleaned words returned by the TTS provider. Also handles words that straddle two sentence boundaries by splitting them and attributing each part to its correct source frame.

View File

@@ -1 +0,0 @@
- Fixed skipped TTS frames (e.g. code blocks filtered via `skip_aggregator_types`) being emitted to the assistant context immediately instead of waiting for preceding spoken frames to finish. They now hold their position in the frame sequence and are flushed only after all earlier spoken sentences are complete, keeping context ordering correct.

View File

@@ -1 +0,0 @@
- Added `InceptionLLMService` for Inception's Mercury 2 diffusion reasoning model, with support for `reasoning_effort` and `realtime` settings.

View File

@@ -1 +0,0 @@
- Added `GET /status` endpoint to the development runner that reports which transports the running instance accepts (all by default, or the single transport passed via `-t`).

View File

@@ -1 +0,0 @@
- Added plain WebSocket transport support to the development runner. Bots can now accept connections from non-telephony WebSocket clients (e.g., browser apps using protobuf framing) via the `/ws-client` endpoint alongside other transports.

View File

@@ -1 +0,0 @@
- ⚠️ The development runner now supports all transports (WebRTC, Daily, telephony, plain WebSocket) simultaneously from a single server. The `/start` endpoint accepts a `"transport"` field to select the transport per-request; omitting `-t` at startup enables all transports instead of defaulting to WebRTC. The Daily browser-redirect route moved from `GET /` to `GET /daily`.

View File

@@ -1 +0,0 @@
- Fixed `ElevenLabsSTTService` crashing when `language` was passed as `None`. When `language` is not set, the service now lets ElevenLabs auto-detect the audio language.

View File

@@ -1 +0,0 @@
- Fixed websocket STT connection setup failures so services clear stale websocket state and emit non-fatal error frames, allowing `ServiceSwitcher` failover to keep agents running.

View File

@@ -1 +0,0 @@
- Added `max_endpoint_delay_ms` to `SonioxSTTService.Settings`, controlling the maximum delay (500-3000 ms) before endpoint detection finalizes a turn.

View File

@@ -1 +0,0 @@
- `SonioxSTTService` now applies settings updates (e.g. via `STTUpdateSettingsFrame`) using a graceful reconnect instead of a hard disconnect/reconnect, preserving the service's reconnect retry behavior.

View File

@@ -1 +0,0 @@
- Removed the unsupported Georgian (`Language.KA`) language mapping from `SonioxSTTService`.

View File

@@ -1 +0,0 @@
- Updated the default p99 TTFS latency values for Smallest AI, Mistral, and XAI STT so turn stop timing uses measured values instead of the conservative fallback.

View File

@@ -1 +0,0 @@
- Updated the development runner startup banner to show the prebuilt client URL once and list enabled or disabled transports with install hints.

View File

@@ -1 +0,0 @@
- Fixed the development runner so missing optional transport dependencies disable only their related routes instead of failing startup in all-transport mode.

View File

@@ -1 +0,0 @@
- Fixed a race in `ElevenLabsTTSService` where the periodic keepalive could be sent for a new turn's context before that context's `voice_settings` initialization message, causing ElevenLabs to close the WebSocket with a 1008 policy violation (`voice_settings field must be provided in the first message ...`). The keepalive now only targets a context once its context-init has been sent.

View File

@@ -1 +0,0 @@
- Bumped `pipecat-ai-prebuilt` to 1.0.1 in the `runner` extra, updating the prebuilt client UI served by the development runner.

View File

@@ -5,7 +5,7 @@
{% for text, values in sections[section][category].items() %}
{{ text }}
(PR {{ values|join(', ') }})
(PR {{ values|join(', ') }})
{% endfor %}
{% endfor %}

View File

@@ -1,60 +1,108 @@
# Pipecat API Documentation
# Pipecat Documentation
This directory contains the source files for auto-generating Pipecat's API reference documentation.
This directory contains the source files for auto-generating Pipecat's server API reference documentation.
## Setup
1. Install documentation dependencies:
```bash
pip install -r requirements.txt
```
2. Make the build scripts executable:
```bash
chmod +x build-docs.sh rtd-test.py
```
## Building Documentation
From this directory:
From this directory, you can build the documentation in several ways:
### Local Build
```bash
# Build docs (warnings shown but don't fail the build)
cd docs/api && uv run ./build-docs.sh
# Using the build script (automatically opens docs when done)
./build-docs.sh
# Build with strict mode (warnings treated as errors)
cd docs/api && uv run ./build-docs.sh --strict
# Or directly with sphinx-build
sphinx-build -b html . _build/html -W --keep-going
```
The build script will:
### ReadTheDocs Test Build
1. Install documentation dependencies via `uv sync --group docs`
2. Clean previous build output
3. Run `sphinx-build` to generate HTML documentation
4. Open the result in your browser (macOS)
To test the documentation build process exactly as it would run on ReadTheDocs:
```bash
./rtd-test.py
```
This script:
- Creates a fresh virtual environment
- Installs all dependencies as specified in requirements files
- Handles conflicting dependencies (like grpcio versions for Riva)
- Builds the documentation in an isolated environment
- Provides detailed logging of the build process
Use this script to verify your documentation will build correctly on ReadTheDocs before pushing changes.
## Viewing Documentation
The built documentation will be available at `_build/html/index.html`. To open:
```bash
# On MacOS
open _build/html/index.html
# On Linux
xdg-open _build/html/index.html
# On Windows
start _build/html/index.html
```
## Directory Structure
```
.
├── api/ # Auto-generated API documentation (created during build)
├── _build/ # Built documentation output
├── conf.py # Sphinx configuration (mock imports, extensions, etc.)
├── api/ # Auto-generated API documentation
├── _build/ # Built documentation
├── _static/ # Static files (images, css, etc.)
├── conf.py # Sphinx configuration
├── index.rst # Main documentation entry point
├── requirements-base.txt # Base documentation dependencies
├── requirements-riva.txt # Riva-specific dependencies
├── build-docs.sh # Local build script
└── rtd-test.sh # ReadTheDocs test build script (uses pip, not uv)
└── rtd-test.py # ReadTheDocs test build script
```
## How It Works
## Notes
- `conf.py` runs `sphinx-apidoc` during Sphinx's `setup()` phase to generate `.rst` files from Python source
- Sphinx autodoc imports each module to extract docstrings
- Modules with unavailable dependencies are listed in `autodoc_mock_imports` in `conf.py`
- Napoleon extension converts Google-style docstrings to reStructuredText
- Documentation is auto-generated from Python docstrings
- Service modules are automatically detected and included
- The build process matches our ReadTheDocs configuration
- Warnings are treated as errors (-W flag) to maintain consistency
- The --keep-going flag ensures all errors are reported
- Dependencies are split into multiple requirements files to handle version conflicts
## Troubleshooting
**Module not appearing in docs:**
If you encounter missing service modules:
1. Check the build output for `autodoc: failed to import` warnings
2. If the module has an unresolvable import dependency, add it to `autodoc_mock_imports` in `conf.py`
3. Verify the module is importable: `uv run python -c "import pipecat.module.name"`
1. Verify the service is installed with its extras: `pip install pipecat-ai[service-name]`
2. Check the build logs for import errors
3. Ensure the service module is properly initialized in the package
4. Run `./rtd-test.py` to test in an isolated environment matching ReadTheDocs
**Duplicate object warnings:**
For dependency conflicts:
These come from re-export modules or Sphinx discovering the same class through multiple import paths. Usually cosmetic.
1. Check the requirements files for version specifications
2. Use `rtd-test.py` to verify dependency resolution
3. Consider adding service-specific requirements files if needed
**Docstring formatting warnings:**
For more information:
Docstrings use reStructuredText, not Markdown. Common issues:
- Use `Example::` with indented code blocks, not `` ```python ``
- Ensure blank lines between directive content and subsequent sections
- Use `Parameters:` (not `Attributes:`) for dataclass field documentation to avoid duplicate entries
- [ReadTheDocs Configuration](.readthedocs.yaml)
- [Sphinx Documentation](https://www.sphinx-doc.org/)

View File

@@ -1,16 +1,8 @@
#!/bin/bash
# Usage: ./build-docs.sh [--strict]
# --strict: Treat warnings as errors (default: warnings only)
SPHINX_OPTS=""
if [ "$1" = "--strict" ]; then
SPHINX_OPTS="-W --keep-going"
fi
# Build docs using uv
echo "Installing dependencies with uv..."
uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
# Check if sphinx-build is available
if ! uv run sphinx-build --version &> /dev/null; then
@@ -22,7 +14,8 @@ fi
rm -rf _build
echo "Building documentation..."
uv run sphinx-build -b html -d _build/doctrees . _build/html $SPHINX_OPTS
# Build docs matching ReadTheDocs configuration
uv run sphinx-build -b html -d _build/doctrees . _build/html -W --keep-going
if [ $? -eq 0 ]; then
echo "Documentation built successfully!"

View File

@@ -4,19 +4,6 @@ import sys
from datetime import datetime
from pathlib import Path
# Fix Pydantic v2 + Sphinx autodoc incompatibility: ConfigDict(extra="allow") fails
# during Sphinx's import because __pydantic_extra__ annotation on BaseModel resolves to
# `Dict[str, Any] | None` whose get_origin() is Union, not dict. Patch the check to
# accept Union-wrapped dict types (i.e., Optional[Dict[str, Any]]).
import pydantic._internal._generate_schema as _pydantic_gs
_ORIG_DICT_TYPES = _pydantic_gs.DICT_TYPES
# Expand the accepted types to include Union (Optional[Dict[str, Any]])
import types
import typing
_pydantic_gs.DICT_TYPES = [*_ORIG_DICT_TYPES, typing.Union, types.UnionType]
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("sphinx-build")
@@ -61,6 +48,8 @@ autodoc_default_options = {
# Mock imports for optional dependencies
autodoc_mock_imports = [
# Krisp - has build issues on some platforms
"pipecat_ai_krisp",
"krisp",
"krisp_audio",
# System-specific GUI libraries
"_tkinter",
@@ -89,6 +78,16 @@ autodoc_mock_imports = [
"einops",
"intel_extension_for_pytorch",
"huggingface_hub",
# riva dependencies
"riva",
"riva.client",
"riva.client.Auth",
"riva.client.ASRService",
"riva.client.StreamingRecognitionConfig",
"riva.client.RecognitionConfig",
"riva.client.AudioEncoding",
"riva.client.proto.riva_tts_pb2",
"riva.client.SpeechSynthesisService",
# MLX dependencies (Apple Silicon specific)
"mlx",
"mlx_whisper", # Note: might need underscore format too
@@ -99,6 +98,7 @@ autodoc_mock_imports = [
"cartesia",
"camb",
"sarvamai",
"openpipe",
"openai.types.beta.realtime",
"langchain_core",
"langchain_core.messages",
@@ -110,8 +110,6 @@ autodoc_mock_imports = [
"fastapi.middleware",
"fastapi.responses",
"uvicorn",
# Deepgram dependencies
"deepgram",
]
# HTML output settings
@@ -138,8 +136,6 @@ def import_core_modules():
"pipecat.runner",
"pipecat.serializers",
"pipecat.transcriptions",
"pipecat.turns",
"pipecat.extensions",
"pipecat.utils",
]
@@ -184,6 +180,7 @@ def setup(app):
logger.info(f"Source directory: {source_dir}")
excludes = [
str(project_root / "src/pipecat/pipeline/to_be_updated"),
str(project_root / "src/pipecat/examples"),
str(project_root / "src/pipecat/tests"),
"**/test_*.py",

View File

@@ -32,5 +32,4 @@ Quick Links
Services <api/pipecat.services>
Transcriptions <api/pipecat.transcriptions>
Transports <api/pipecat.transports>
Turns <api/pipecat.turns>
Utils <api/pipecat.utils>

View File

@@ -1,5 +1,5 @@
# AI-COUSTICS
AIC_LICENSE_KEY=...
AICOUSTICS_LICENSE_KEY=...
# Anthropic
ANTHROPIC_API_KEY=...
@@ -80,6 +80,9 @@ GOOGLE_TEST_CREDENTIALS=...
# Gradium
GRAPDIUM_API_KEY=...
# Grok
GROK_API_KEY=...
# Groq
GROQ_API_KEY=...
@@ -91,9 +94,6 @@ HEYGEN_LIVE_AVATAR_API_KEY=...
HUME_API_KEY=...
HUME_VOICE_ID=...
# Inception
INCEPTION_API_KEY=...
# Inworld
INWORLD_API_KEY=...
@@ -124,25 +124,18 @@ MINIMAX_GROUP_ID=...
# Mistral
MISTRAL_API_KEY=...
# Nebius
NEBIUS_API_KEY=...
# Neuphonic
NEUPHONIC_API_KEY=...
# Novita
NOVITA_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
SAGEMAKER_ASR_ENDPOINT_NAME=...
SAGEMAKER_MAGPIE_ENDPOINT_NAME=...
# OpenAI
OPENAI_API_KEY=...
# OpenPipe
OPENPIPE_API_KEY=...
# OpenRouter
OPENROUTER_API_KEY=...
@@ -183,9 +176,6 @@ SENTRY_DSN=...
SIMLI_API_KEY=...
SIMLI_FACE_ID=...
# Smallest
SMALLEST_API_KEY=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=...
FAL_SMART_TURN_API_KEY=...
@@ -214,22 +204,8 @@ TWILIO_AUTH_TOKEN=...
# Ultravox Realtime
ULTRAVOX_API_KEY=...
# Vonage
VONAGE_APPLICATION_ID=...
VONAGE_SESSION_ID=...
VONAGE_TOKEN=...
# WhatsApp
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
WHATSAPP_PHONE_NUMBER_ID=...
WHATSAPP_APP_SECRET=...
# xAI / Grok
XAI_API_KEY=...
# PIPECAT_SCTP_MAX_CHUNK_SIZE controls the maximum SCTP DATA-chunk payload
# size (bytes) used by aiortc's data channel. The default is 1100.
# All the details here:
# https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc#pipecat_sctp_max_chunk_size
#PIPECAT_SCTP_MAX_CHUNK_SIZE=1100

View File

@@ -1,150 +1,31 @@
# Pipecat Examples
This directory contains examples showing how to build voice and multimodal agents with Pipecat.
This directory contains examples to help you learn how to build with Pipecat.
## Setup
## Getting Started
1. Follow the [README](https://github.com/pipecat-ai/pipecat/blob/main/README.md#%EF%B8%8F-contributing-to-the-framework) steps to get your local environment configured.
New to Pipecat? Start here:
> **Run from root directory**: Make sure you are running the steps from the root directory.
- **[Quickstart](quickstart/)** - Get your first voice AI bot running in 5 minutes _(coming soon)_
- **[Client/Server Web](client-server-web/)** - Learn to build web applications with Pipecat's client SDKs _(coming soon)_
- **[Phone Bot with Twilio](phone-bot-twilio/)** - Connect your bot to a phone number _(coming soon)_
> **Using local audio?**: The `LocalAudioTransport` requires a system dependency for `portaudio`. Install the dependency to use the transport.
## Foundational Examples
2. Copy the [`env.example`](../env.example) file and add API keys for services you plan to use:
Single-file examples that introduce core Pipecat concepts one at a time. These examples:
```bash
cp env.example .env
# Edit .env with your API keys
```
- Build on each other progressively
- Focus on specific features or integrations
- Are used for testing with every Pipecat release
3. Run any example:
See the **[Foundational Examples README](foundational/)** for the complete list.
```bash
uv run python getting-started/01-say-one-thing.py
```
## More Advanced Examples
4. Open the web interface at http://localhost:7860/client/ and click "Connect"
Ready to explore complex use cases? Visit **[pipecat-examples](https://github.com/pipecat-ai/pipecat-examples)** for:
## Running examples with other transports
Most examples support running with other transports, like Twilio or Daily.
### Daily
You need to create a Daily account at https://dashboard.daily.co/u/signup. Once signed up, you can create your own room from the dashboard and set the environment variables `DAILY_ROOM_URL` and `DAILY_API_KEY`. Alternatively, you can let the example create a room for you (still needs `DAILY_API_KEY` environment variable). Then, start any example with `-t daily`:
```bash
uv run getting-started/06-voice-agent.py -t daily
```
### Twilio
It is also possible to run the example through a Twilio phone number. You will need to setup a few things:
1. Install and run [ngrok](https://ngrok.com/download).
```bash
ngrok http 7860
```
2. Configure your Twilio phone number. One way is to setup a TwiML app and set the request URL to the ngrok URL from step (1). Then, set your phone number to use the new TwiML app.
Then, run the example with:
```bash
uv run getting-started/06-voice-agent.py -t twilio -x NGROK_HOST_NAME
```
## Directory Structure
### [`getting-started/`](./getting-started/)
Progressive introduction to Pipecat, from minimal TTS to a full voice agent with function calling.
### [`voice/`](./voice/)
Full STT + LLM + TTS voice agent pipelines showcasing different speech service providers (Deepgram, ElevenLabs, Cartesia, etc.)
### [`function-calling/`](./function-calling/)
Function calling with different LLM providers (OpenAI, Anthropic, Google, etc.)
### [`transcription/`](./transcription/)
Speech-to-text examples with various STT providers.
### [`vision/`](./vision/)
Image description and vision capabilities with different multimodal LLMs.
### [`realtime/`](./realtime/)
Realtime and multimodal live APIs (OpenAI Realtime, Gemini Live, AWS Nova Sonic, Ultravox, Grok).
### [`persistent-context/`](./persistent-context/)
Maintaining conversation context across sessions with different providers.
### [`context-summarization/`](./context-summarization/)
Summarizing conversation context to manage token limits.
### [`update-settings/`](./update-settings/)
Changing service settings at runtime, organized by service type:
- **[`stt/`](./update-settings/stt/)** — Speech-to-text settings
- **[`tts/`](./update-settings/tts/)** — Text-to-speech settings
- **[`llm/`](./update-settings/llm/)** — LLM settings
### [`turn-management/`](./turn-management/)
Turn detection, interruption handling, and user input management.
### [`thinking-and-mcp/`](./thinking-and-mcp/)
LLM thinking/reasoning modes and MCP (Model Context Protocol) tool server integration.
### [`transports/`](./transports/)
Transport layer examples (WebRTC, Daily, LiveKit).
### [`video-avatar/`](./video-avatar/)
Video avatar integrations (Tavus, HeyGen, Simli, LemonSlice).
### [`video-processing/`](./video-processing/)
Video processing, mirroring, GStreamer, and custom video tracks.
### [`audio/`](./audio/)
Audio recording, background sounds, and sound effects.
### [`observability/`](./observability/)
Pipeline monitoring: observers, heartbeats, and Sentry metrics.
### [`rag/`](./rag/)
Retrieval-augmented generation, grounding, and long-term memory (Mem0, Gemini).
### [`features/`](./features/)
Miscellaneous features: wake phrases, live translation, service switching, voice switching, and more.
## Advanced Usage
### Customizing Network Settings
```bash
uv run python <example-name> --host 0.0.0.0 --port 8080
```
### Troubleshooting
- **No audio/video**: Check browser permissions for microphone and camera
- **Connection errors**: Verify API keys in `.env` file
- **Port conflicts**: Use `--port` to change the port
For more examples, visit the [pipecat-examples repository](https://github.com/pipecat-ai/pipecat-examples).
- Production-ready applications
- Multi-platform client implementations
- Telephony integrations
- Multimodal and creative applications
- Deployment and monitoring examples

View File

@@ -1,232 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual validation harness for the ``add_tool_change_messages`` feature.
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination:
- **Forward hallucination** — calling a tool that has been removed.
- **Negative hallucination** — refusing to call a tool that has been
re-added (because recent context is full of "I can't" responses).
- **Hallucinated output when tools are unavailable** — making up an
answer rather than declining gracefully, or producing JSON that
*looks* like a tool call but is actually just an assistant text
response.
The ``add_tool_change_messages`` feature mitigates these by appending a
developer-role message to the conversation whenever ``LLMSetToolsFrame``
changes the set of advertised tools, so the LLM stays in sync with what's
actually available.
This harness exercises all of those flavors by flipping the advertised
tool set on a turn counter:
Phase 0 (turns 14): weather tool ACTIVE — confirm baseline.
Phase 1 (turns 58): tool REMOVED — keep asking for weather.
Phase 2 (turn 9+): tool RE-ADDED — does the LLM call it again?
Set ``ADD_TOOL_CHANGE_MESSAGES=0`` to disable the mitigation and see the
unmitigated behavior. The default is ON so a fresh run shows the feature
working.
Defaults to Llama 3.1 8B Instruct via a locally-running Ollama —
anecdotally one of the more hallucination-prone of the easily accessible
models. Pull the model once with ``ollama pull llama3.1:8b`` and make
sure ``ollama serve`` is running. Swap the LLM service to validate other
providers.
Run with::
uv run examples/features/features-add-tool-change-messages.py
ADD_TOOL_CHANGE_MESSAGES=0 uv run examples/features/features-add-tool-change-messages.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# Default ON so a fresh run shows the feature working. Set to "0" to A/B
# against the unmitigated behavior.
ADD_TOOL_CHANGE_MESSAGES = os.environ.get("ADD_TOOL_CHANGE_MESSAGES", "1") == "1"
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(
f"Starting add_tool_change_messages demo bot "
f"(ADD_TOOL_CHANGE_MESSAGES={ADD_TOOL_CHANGE_MESSAGES})"
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OLLamaLLMService(
settings=OLLamaLLMService.Settings(
# Llama 3.1 8B Instruct is anecdotally one of the more
# hallucination-prone of the easily accessible models — exactly
# what we want for this validation harness. Pull it with
# ``ollama pull llama3.1:8b`` and make sure ``ollama serve``
# is running.
model="llama3.1:8b",
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"If the user asks for the current weather, use the `get_current_weather` "
"function if it's available. IMPORTANT: if you do not have access to the function, "
"say something along the lines of 'Sorry, I can't check the weather right now.'."
),
),
)
llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
add_tool_change_messages=ADD_TOOL_CHANGE_MESSAGES,
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Phase controller: roughly 4 turns per phase.
user_turn_count = 0
REMOVE_AT_TURN = 5 # tool gone for turn N onward
READD_AT_TURN = 9 # tool back for turn N onward
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
nonlocal user_turn_count
user_turn_count += 1
logger.info(f"=== User turn {user_turn_count} complete ===")
if user_turn_count == REMOVE_AT_TURN - 1:
logger.info(
"=== Phase 1: weather tool REMOVED. Keep asking about the weather "
"to exercise hallucination scenarios. ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=NOT_GIVEN))
elif user_turn_count == READD_AT_TURN - 1:
logger.info(
"=== Phase 2: weather tool RE-ADDED. Ask for the weather again — "
"does the LLM call it, or keep refusing? (THIS IS THE TEST.) ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=weather_tools))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Phase 0: weather tool ACTIVE. Ask for the weather a few times "
"to confirm it's working. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite them "
"to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,327 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example demonstrating ``PipelineTask(app_resources=...)``.
``app_resources`` is an application-defined bag of anything your
application code may want to share across a session: database handles,
HTTP clients, feature flags, per-user state, observability clients,
in-memory caches — whatever fits your app. Pipecat passes it through
untouched and exposes it as ``task.app_resources``, so any code with a
handle on the task can read or mutate it.
Two of the convenience aliases exercised below:
- Tool handlers read it from ``FunctionCallParams.app_resources``.
- Custom ``FrameProcessor`` subclasses read it from
``self.pipeline_task.app_resources``.
This example uses two small loggers as stand-ins for that "shared thing":
``ToolCallLogger`` (written from tool handlers) and
``TranscriptionLogger`` (written from a custom ``FrameProcessor`` that
sits in the pipeline). A real app might just as easily pass a Postgres
pool, a Redis client, a Stripe SDK instance, or any combination thereof.
The mechanics shown here — construct once, hand to the task, read it
from each site, inspect it after the session — are the same regardless
of what you put in.
We bundle resources in a typed ``AppResources`` dataclass and cast back
to it at each read site. Pipecat doesn't care what type you pass (a
plain dict works too), but a typed container gives you autocomplete and
refactor safety instead of dict-by-string-key lookups.
"""
import json
import os
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, cast
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMRunFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
class ToolCallLogger:
"""Stand-in shared resource — swap for whatever your app actually needs."""
def __init__(self):
"""Initialize the logger with an empty list of recorded calls."""
self._calls: list[dict[str, Any]] = []
def log_tool_call(self, function_name: str, arguments: Mapping[str, Any]) -> None:
"""Record a tool call invocation.
Args:
function_name: The name of the tool being invoked.
arguments: The arguments passed to the tool.
"""
entry = {
"timestamp": datetime.now(UTC).isoformat(),
"function_name": function_name,
"arguments": dict(arguments),
}
self._calls.append(entry)
logger.info(f"[ToolCallLogger] {function_name} called with {dict(arguments)}")
def dump(self) -> str:
"""Return all recorded tool calls as a JSON string."""
return json.dumps(self._calls, indent=2)
class TranscriptionLogger:
"""Records final user transcriptions — written from a custom FrameProcessor."""
def __init__(self):
"""Initialize the logger with an empty list of recorded transcriptions."""
self._entries: list[dict[str, Any]] = []
def log_transcription(self, text: str) -> None:
"""Record a transcription.
Args:
text: The transcribed user utterance.
"""
entry = {
"timestamp": datetime.now(UTC).isoformat(),
"text": text,
}
self._entries.append(entry)
logger.info(f"[TranscriptionLogger] {text!r}")
def dump(self) -> str:
"""Return all recorded transcriptions as a JSON string."""
return json.dumps(self._entries, indent=2)
@dataclass
class AppResources:
"""Typed container for everything the app shares across this session.
Add fields here as the app grows (e.g. ``db: AsyncConnection``,
``http: httpx.AsyncClient``). Read sites ``cast()`` to this type to
get autocomplete and refactor safety:
- In tools: ``cast(AppResources, params.app_resources)``.
- In custom processors: ``cast(AppResources, self.pipeline_task.app_resources)``.
"""
tool_call_logger: ToolCallLogger
transcription_logger: TranscriptionLogger
async def fetch_weather_from_api(params: FunctionCallParams):
resources = cast(AppResources, params.app_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
resources = cast(AppResources, params.app_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"name": "The Golden Dragon"})
class TranscriptionLoggingProcessor(FrameProcessor):
"""Logs each final user transcription into the shared app resources.
Demonstrates the second read site for ``app_resources``: any custom
``FrameProcessor`` can reach the same bag every tool handler sees by
going through ``self.pipeline_task.app_resources``. ``pipeline_task``
is ``None`` until the task sets the processor up, so we guard against
that case.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Forward all frames; log final user transcriptions on the way through."""
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame) and self.pipeline_task is not None:
resources = cast(AppResources, self.pipeline_task.app_resources)
resources.transcription_logger.log_transcription(frame.text)
await self.push_frame(frame, direction)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIResponsesLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_connection_error")
async def on_connection_error(service, error):
logger.error(f"LLM connection error: {error}")
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
# Avoid appending this filler message to the LLM context — it would
# alter the conversation history and prevent
# OpenAIResponsesLLMService's previous_response_id optimization from
# matching, forcing a full context resend.
await tts.queue_frame(TTSSpeakFrame("Let me check on that.", append_to_context=False))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
TranscriptionLoggingProcessor(),
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
# Keep local handles so we can read collected state after the session
# ends; Pipecat never copies or clears the object.
tool_call_logger = ToolCallLogger()
transcription_logger = TranscriptionLogger()
resources = AppResources(
tool_call_logger=tool_call_logger,
transcription_logger=transcription_logger,
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
app_resources=resources,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
# The session has ended; read whatever state the handlers built up.
logger.info(f"Tool calls logged during session:\n{tool_call_logger.dump()}")
logger.info(f"Transcriptions logged during session:\n{transcription_logger.dump()}")
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,71 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.piper.tts import PiperHttpTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
tts = PiperHttpTTSService(
base_url=os.getenv("PIPER_BASE_URL"),
aiohttp_session=session,
sample_rate=24000,
)
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,72 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.rime.tts import RimeHttpTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
aiohttp_session=session,
settings=RimeHttpTTSService.Settings(
voice="rex",
),
)
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -36,7 +36,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),

View File

@@ -28,7 +28,7 @@ async def main():
transport = LocalAudioTransport(LocalAudioTransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),

View File

@@ -0,0 +1,64 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.livekit import configure
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
(url, token, room_name) = await configure()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TTSSpeakFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,64 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.nvidia.tts import NvidiaTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = NvidiaTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -38,14 +38,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -60,7 +60,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
context = LLMContext()
context.add_message({"role": "developer", "content": "Say hello to the world."})
context.add_message({"role": "user", "content": "Say hello to the world."})
await task.queue_frames([LLMContextFrame(context), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

View File

@@ -0,0 +1,84 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.fal.image import FalImageGenService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
imagegen = FalImageGenService(
settings=FalImageGenService.Settings(
image_size="square_hd",
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
task = PipelineTask(
Pipeline([imagegen, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -42,7 +42,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
imagegen = GoogleImageGenService(
api_key=os.environ["GOOGLE_API_KEY"],
api_key=os.getenv("GOOGLE_API_KEY"),
)
task = PipelineTask(

View File

@@ -8,6 +8,7 @@ import argparse
import asyncio
import os
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from dotenv import load_dotenv
@@ -38,7 +39,7 @@ load_dotenv(override=True)
app = FastAPI()
# Store connections by pc_id
pcs_map: dict[str, SmallWebRTCConnection] = {}
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = [
IceServer(
@@ -62,17 +63,17 @@ async def run_example(webrtc_connection: SmallWebRTCConnection):
),
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -108,9 +109,7 @@ async def run_example(webrtc_connection: SmallWebRTCConnection):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
context.add_message({"role": "user", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -49,14 +49,14 @@ async def main():
)
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -92,7 +92,7 @@ async def main():
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
{"role": "user", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])

View File

@@ -53,17 +53,17 @@ async def main():
),
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),

View File

@@ -16,12 +16,11 @@ from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseStartFrame,
OutputImageRawFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import FrameOrder, SyncParallelPipeline
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.sentence import SentenceAggregator
@@ -31,7 +30,6 @@ from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
from pipecat.services.fal.image import FalImageGenService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.tts_service import TextAggregationMode
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -46,18 +44,6 @@ class MonthFrame(DataFrame):
return f"{self.name}(month: {self.month})"
class MarkImageForPlaybackSync(FrameProcessor):
"""Marks output image frames to be synchronized with audio playback."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, OutputImageRawFrame):
frame.sync_with_audio = True
await self.push_frame(frame, direction)
class MonthPrepender(FrameProcessor):
def __init__(self):
super().__init__()
@@ -108,17 +94,13 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
llm = OpenAILLMService(api_key=os.environ["OPENAI_API_KEY"])
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaHttpTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
# No need to aggregate by sentences (the default), as we already know we're getting full sentences
# (Otherwise the service will unnecessarily wait for follow-up input to confirm the sentence is complete,
# which, sadly, actually breaks the synchronization mechanism)
text_aggregation_mode=TextAggregationMode.TOKEN,
)
imagegen = FalImageGenService(
@@ -137,26 +119,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# We use `FrameOrder.PIPELINE` so that each synchronized batch of output
# frames is pushed in the order the pipelines are listed: image first,
# then audio. This ensures the transport receives the image before the
# audio frames it should accompany.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `FalImageGenService` and `CartesiaHttpTTSService` which make HTTP
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[
imagegen, # Generate image
MarkImageForPlaybackSync(), # Mark image as needing sync w/audio during playback
],
[month_prepender, tts], # Create "Month: sentence" and output audio
frame_order=FrameOrder.PIPELINE,
[imagegen], # Generate image
),
transport.output(), # Transport output
]

View File

@@ -0,0 +1,202 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import tkinter as tk
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
OutputAudioRawFrame,
TextFrame,
TTSAudioRawFrame,
URLImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
from pipecat.services.fal.image import FalImageGenService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
tk_root = tk.Tk()
tk_root.title("Calendar")
runner = PipelineRunner()
async def get_month_data(month):
messages = [
{
"role": "user",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
class ImageDescription(FrameProcessor):
def __init__(self):
super().__init__()
self.text = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self.text = frame.text
await self.push_frame(frame, direction)
class AudioGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
await self.push_frame(frame, direction)
class ImageGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaHttpTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
imagegen = FalImageGenService(
settings=FalImageGenService.Settings(
image_size="square_hd",
),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
sentence_aggregator = SentenceAggregator()
description = ImageDescription()
audio_grabber = AudioGrabber()
image_grabber = ImageGrabber()
# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMContextFrame(LLMContext(messages)))
await task.stop_when_done()
await runner.run(task)
return {
"month": month,
"text": description.text,
"image": image_grabber.frame,
"audio": audio_grabber.frame,
}
transport = TkLocalTransport(
tk_root,
TkTransportParams(
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
)
pipeline = Pipeline([transport.output()])
task = PipelineTask(pipeline)
# We only specify a few months as we create tasks all at once and we
# might get rate limited otherwise.
months: list[str] = [
"January",
"February",
]
# We create one task per month. This will be executed concurrently.
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
# Now we wait for each month task in the order they're completed. The
# benefit is we'll have as little delay as possible before the first
# month, and likely no delay between months, but the months won't
# display in order.
async def show_images(month_tasks):
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
await task.queue_frames([data["image"], data["audio"]])
await runner.stop_when_done()
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
await asyncio.gather(runner.run(task), show_images(month_tasks), run_tk())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,153 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMRunFrame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, MetricsFrame):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {tokens.prompt_tokens}, characters: {tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
ml = MetricsLogger()
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
ml,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message({"role": "user", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -96,17 +96,17 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -119,8 +119,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(os.path.dirname(__file__), "..", "assets", "speaking.png"),
os.path.join(os.path.dirname(__file__), "..", "assets", "waiting.png"),
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline(

View File

@@ -54,10 +54,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = CartesiaSTTService(api_key=os.environ["CARTESIA_API_KEY"])
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
aiohttp_session=session,
settings=CartesiaHttpTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
@@ -65,7 +65,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -103,7 +103,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
{"role": "user", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])

View File

@@ -51,17 +51,17 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
@@ -98,9 +98,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
context.add_message({"role": "user", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -91,7 +91,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.environ["SPEECHMATICS_API_KEY"],
api_key=os.getenv("SPEECHMATICS_API_KEY"),
settings=SpeechmaticsSTTService.Settings(
language=Language.EN,
turn_detection_mode=SpeechmaticsSTTService.TurnDetectionMode.ADAPTIVE,
@@ -102,7 +102,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
tts = SpeechmaticsTTSService(
api_key=os.environ["SPEECHMATICS_API_KEY"],
api_key=os.getenv("SPEECHMATICS_API_KEY"),
settings=SpeechmaticsTTSService.Settings(
voice="sarah",
),
@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
temperature=0.75,
system_instruction="You are a helpful British assistant called Sarah in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Always include punctuation in your responses. Give very short replies - do not give longer replies unless strictly necessary. Respond to what the user said in a concise, funny, creative and helpful way. Use `<Sn/>` tags to identify different speakers - do not use tags in your replies. Do not respond to speakers within `<PASSIVE/>` tags unless explicitly asked to.",
@@ -148,7 +148,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message({"role": "developer", "content": "Say a short hello to the user."})
context.add_message({"role": "user", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -74,7 +74,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.environ["SPEECHMATICS_API_KEY"],
api_key=os.getenv("SPEECHMATICS_API_KEY"),
settings=SpeechmaticsSTTService.Settings(
language=Language.EN,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
@@ -82,7 +82,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
tts = SpeechmaticsTTSService(
api_key=os.environ["SPEECHMATICS_API_KEY"],
api_key=os.getenv("SPEECHMATICS_API_KEY"),
settings=SpeechmaticsTTSService.Settings(
voice="sarah",
),
@@ -90,7 +90,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
temperature=0.75,
system_instruction="You are a helpful British assistant called Sarah in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Always include punctuation in your responses. Give very short replies - do not give longer replies unless strictly necessary. Respond to what the user said in a concise, funny, creative and helpful way. Use `<Sn/>` tags to identify different speakers - do not use tags in your replies. Do not respond to speakers within `<PASSIVE/>` tags unless explicitly asked to.",
@@ -128,7 +128,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message({"role": "developer", "content": "Say a short hello to the user."})
context.add_message({"role": "user", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -8,15 +8,15 @@
import os
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMMessagesUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -67,10 +67,10 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
@@ -129,10 +129,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# An `LLMContextFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
context.add_message(
{"role": "developer", "content": "Please briefly introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
messages = [({"content": "Please briefly introduce yourself to the user."})]
await task.queue_frames([LLMMessagesUpdateFrame(messages, run_llm=True)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

Some files were not shown because too many files have changed in this diff Show More