Compare commits

..

1 Commits

Author SHA1 Message Date
Chad Bailey
f1b093d9f1 translation use case working 2024-04-02 15:12:47 +00:00
809 changed files with 7100 additions and 143631 deletions

View File

@@ -1,30 +0,0 @@
# flyctl launch added from .gitignore
**/.vscode
**/env
**/__pycache__
**/*~
**/venv
#*#
# Distribution / packaging
**/.Python
**/build
**/develop-eggs
**/dist
**/downloads
**/eggs
**/.eggs
**/lib
**/lib64
**/parts
**/sdist
**/var
**/wheels
**/share/python-wheels
**/*.egg-info
**/.installed.cfg
**/*.egg
**/MANIFEST
**/.DS_Store
**/.env
fly.toml

View File

@@ -1,87 +0,0 @@
name: Bug report
description: Report a bug or unexpected behavior
type: Bug
body:
- type: markdown
attributes:
value: |
## Bug Report
Thank you for taking the time to fill out this bug report.
- type: markdown
attributes:
value: |
### Environment
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: python-version
attributes:
label: Python version
description: Which Python version are you using?
placeholder: e.g., 3.12.8
validations:
required: true
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using?
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: true
- type: textarea
id: description
attributes:
label: Issue description
description: Provide a clear description of the issue.
validations:
required: true
- type: textarea
id: repro
attributes:
label: Reproduction steps
description: List the steps to reproduce the issue.
placeholder: |
1. Do this...
2. Then do that...
3. Observe the error...
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected behavior
description: What did you expect to happen?
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual behavior
description: What actually happened?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs
description: If applicable, include any relevant logs or error messages
render: shell
validations:
required: false

View File

@@ -1,67 +0,0 @@
name: Question
description: Ask a question or get help
type: Question
body:
- type: markdown
attributes:
value: |
## Question
Use this form to ask a question about pipecat.
- type: markdown
attributes:
value: |
### Environment (if applicable)
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using? (if applicable)
placeholder: e.g., 0.0.63
validations:
required: false
- type: input
id: python-version
attributes:
label: Python version
description: Which Python version are you using? (if applicable)
placeholder: e.g., 3.12.8
validations:
required: false
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using? (if applicable)
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: false
- type: textarea
id: question
attributes:
label: Question
description: Provide your question in detail here.
validations:
required: true
- type: textarea
id: tried
attributes:
label: What I've tried
description: Describe what you've already tried or research you've done.
placeholder: I've looked at the documentation and tried...
validations:
required: false
- type: textarea
id: context
attributes:
label: Context
description: Any additional context or information that might help others understand your question better.
validations:
required: false

View File

@@ -1,52 +0,0 @@
name: Feature request
description: Suggest an enhancement or new feature
type: Enhancement
body:
- type: markdown
attributes:
value: |
## Feature Request
Thank you for suggesting an enhancement to pipecat.
- type: textarea
id: problem
attributes:
label: Problem Statement
description: A clear description of the problem this feature would solve.
placeholder: I'm always frustrated when...
validations:
required: true
- type: textarea
id: solution
attributes:
label: Proposed Solution
description: A clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternative Solutions
description: Any alternative solutions or features you've considered.
validations:
required: false
- type: textarea
id: context
attributes:
label: Additional Context
description: Add any other context, mockups, or screenshots about the feature request here.
placeholder: You can drag and drop images here to include them.
validations:
required: false
- type: checkboxes
id: contribution
attributes:
label: Would you be willing to help implement this feature?
options:
- label: Yes, I'd like to contribute
- label: No, I'm just suggesting

View File

@@ -1,82 +0,0 @@
name: Service Issue
description: An issue with a third-party service
type: Service Issue
body:
- type: markdown
attributes:
value: |
## Service Issue
Use this form to report an issue with a third-party service integration.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: service-name
attributes:
label: Service Name
description: Which third-party service is having issues?
placeholder: e.g., OpenAI, ElevenLabs, Anthropic
validations:
required: true
- type: input
id: service-version
attributes:
label: Service or model version
description: Which version of the service API or model are you using?
placeholder: e.g., v1, gpt-4.1
validations:
required: false
- type: textarea
id: description
attributes:
label: Issue Description
description: Provide a clear description of the service issue.
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Reproduction Steps
description: Provide steps to reproduce the issue.
placeholder: |
1. Configure service X
2. Call method Y
3. See error Z
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected Behavior
description: What did you expect to happen?
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual Behavior
description: What actually happened?
validations:
required: true
- type: textarea
id: logs
attributes:
label: Error Logs
description: If available, include any error messages or logs.
render: shell
validations:
required: false

View File

@@ -1,56 +0,0 @@
name: New Service
description: Request to support a new third-party service
type: New Service
body:
- type: markdown
attributes:
value: |
## New Service Request
Use this form to request support for a new third-party service in pipecat.
- type: input
id: service-name
attributes:
label: Service Name
description: What is the name of the third-party service?
placeholder: e.g., NewAPI, SomeService
validations:
required: true
- type: input
id: service-website
attributes:
label: Service Website
description: Link to the service's website or documentation
placeholder: e.g., https://newapi.com
validations:
required: true
- type: textarea
id: service-description
attributes:
label: Service Description
description: Briefly describe what this service does and how it works.
validations:
required: true
- type: textarea
id: api-info
attributes:
label: API Information
description: If available, provide details about the service's API.
placeholder: |
- API documentation link
- Authentication method
- Key endpoints you'd like supported
validations:
required: false
- type: checkboxes
id: contribution
attributes:
label: Would you be willing to help implement this service?
options:
- label: Yes, I'd like to contribute
- label: No, I'm just suggesting

View File

@@ -1,74 +0,0 @@
name: Dependency Issue
description: An issue with a Pipecat dependency (not a third-party service)
type: Dependency Issue
body:
- type: markdown
attributes:
value: |
## Dependency Issue
Use this form to report an issue with a Pipecat dependency.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: dependency-name
attributes:
label: Dependency Name
description: Which Pipecat dependency is causing the issue?
placeholder: e.g., openai, anthropic, fastapi
validations:
required: true
- type: input
id: dependency-version
attributes:
label: Dependency Version
description: Which version of the dependency are you using?
placeholder: e.g., 1.2.3
validations:
required: true
- type: textarea
id: description
attributes:
label: Issue Description
description: Provide a clear description of the dependency issue.
validations:
required: true
- type: textarea
id: impact
attributes:
label: Impact
description: How is this dependency issue affecting your usage of pipecat?
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Reproduction Steps
description: If applicable, provide steps to reproduce the issue.
placeholder: |
1. Install dependency X
2. Run command Y
3. See error Z
validations:
required: false
- type: textarea
id: logs
attributes:
label: Error Logs
description: If applicable, include any relevant error messages or logs.
render: shell
validations:
required: false

View File

@@ -1,70 +0,0 @@
name: Troubleshooting
description: Help with a specific use case
type: Troubleshooting
body:
- type: markdown
attributes:
value: |
## Troubleshooting Request
Use this form to get help with a specific use case or implementation.
- type: input
id: pipecat-version
attributes:
label: pipecat version
description: Which version are you using?
placeholder: e.g., 0.0.63
validations:
required: true
- type: input
id: python-version
attributes:
label: Python version
description: Which version of Python are you using?
placeholder: e.g., 3.12.8
validations:
required: true
- type: input
id: os
attributes:
label: Operating System
description: Which OS are you using?
placeholder: e.g., Ubuntu 24.04, Windows 11, macOS 12.5
validations:
required: true
- type: textarea
id: use-case
attributes:
label: Use Case Description
description: Describe what you're trying to accomplish with pipecat.
validations:
required: true
- type: textarea
id: current-approach
attributes:
label: Current Approach
description: What have you tried so far? Include code snippets if relevant.
render: python
validations:
required: true
- type: textarea
id: errors
attributes:
label: Errors or Unexpected Behavior
description: Describe any errors or unexpected behavior you're encountering.
validations:
required: true
- type: textarea
id: additional-context
attributes:
label: Additional Context
description: Any other information that might help us understand your situation.
validations:
required: false

View File

@@ -1 +0,0 @@
blank_issues_enabled: false

View File

@@ -1 +0,0 @@
#### Please describe the changes in your PR. If it is addressing an issue, please reference that as well.

View File

@@ -1,40 +0,0 @@
name: build
on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- "**"
paths-ignore:
- "docs/**"
concurrency:
group: build-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
build:
name: "Build and Install"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
- name: Set up Python
run: uv python install 3.10
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
run: uv build
- name: Install project in editable mode
run: uv pip install --editable .

View File

@@ -1,47 +0,0 @@
name: coverage
on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- "**"
paths-ignore:
- "docs/**"
jobs:
coverage:
name: "Coverage"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
- name: Set up Python
run: uv python install 3.12
- name: Install system packages
run: |
sudo apt-get install -y portaudio19-dev
- name: Install dependencies
run: |
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain
- name: Run tests with coverage
run: |
uv run coverage run
uv run coverage xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: pipecat-ai/pipecat

View File

@@ -1,43 +0,0 @@
name: format
on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- "**"
paths-ignore:
- "docs/**"
concurrency:
group: build-format-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
ruff-format:
name: "Code quality checks"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
- name: Set up Python
run: uv python install 3.10
- name: Install development dependencies
run: uv sync --group dev
- name: Ruff formatter
id: ruff-format
run: uv run ruff format --diff
- name: Ruff linter (all rules)
id: ruff-check
run: uv run ruff check

44
.github/workflows/lint.yaml vendored Normal file
View File

@@ -0,0 +1,44 @@
name: lint
on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- "**"
paths-ignore:
- "docs/**"
concurrency:
group: build-lint-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
autopep8:
name: "Formatting lints"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Setup virtual environment
run: |
python -m venv .venv
- name: Install basic Python dependencies
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: autopep8
id: autopep8
run: |
source .venv/bin/activate
autopep8 --max-line-length 100 --exit-code -r -d --exclude "*_pb2.py" -a -a src/
- name: Fail if autopep8 requires changes
if: steps.autopep8.outputs.exit-code == 2
run: exit 1

View File

@@ -1,78 +0,0 @@
name: publish
on:
workflow_dispatch:
inputs:
gitref:
type: string
description: 'what git tag to build (e.g. v0.0.74)'
required: true
jobs:
build:
name: 'Build and upload wheels'
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.gitref }}
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 'latest'
- name: Set up Python
run: uv python install 3.12
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
run: uv build
- name: Upload wheels
uses: actions/upload-artifact@v4
with:
name: wheels
path: ./dist
publish-to-pypi:
name: 'Publish to PyPI'
runs-on: ubuntu-latest
needs: [build]
environment:
name: pypi
url: https://pypi.org/p/pipecat-ai
permissions:
id-token: write
steps:
- name: Download wheels
uses: actions/download-artifact@v4
with:
name: wheels
path: ./dist
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
verbose: true
print-hash: true
publish-to-test-pypi:
name: 'Publish to Test PyPI'
runs-on: ubuntu-latest
needs: [build]
environment:
name: testpypi
url: https://test.pypi.org/p/pipecat-ai
permissions:
id-token: write
steps:
- name: Download wheels
uses: actions/download-artifact@v4
with:
name: wheels
path: ./dist
- name: Publish to Test PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
verbose: true
print-hash: true
repository-url: https://test.pypi.org/legacy/

View File

@@ -1,51 +0,0 @@
name: publish-test
on: workflow_dispatch
jobs:
build:
name: 'Build and upload wheels'
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
with:
fetch-tags: true
fetch-depth: 100
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 'latest'
- name: Set up Python
run: uv python install 3.12
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
run: uv build
- name: Upload wheels
uses: actions/upload-artifact@v4
with:
name: wheels
path: ./dist
publish-to-test-pypi:
name: 'Publish to Test PyPI'
runs-on: ubuntu-latest
needs: [build]
environment:
name: testpypi
url: https://test.pypi.org/p/pipecat-ai
permissions:
id-token: write
steps:
- name: Download wheels
uses: actions/download-artifact@v4
with:
name: wheels
path: ./dist
- name: Publish to Test PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
verbose: true
print-hash: true
repository-url: https://test.pypi.org/legacy/

View File

@@ -1,61 +0,0 @@
name: Python Compatibility Test
on:
push:
branches: [main, develop]
paths: ['pyproject.toml']
pull_request:
branches: [main, develop]
paths: ['pyproject.toml']
jobs:
test-compatibility:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.10.18', '3.11.13', '3.12.11', '3.13.5']
name: Python ${{ matrix.python-version }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
portaudio19-dev \
libcairo2-dev \
libgirepository1.0-dev \
pkg-config
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
version: 'latest'
- name: Set up Python ${{ matrix.python-version }}
run: |
uv python install ${{ matrix.python-version }}
uv python pin ${{ matrix.python-version }}
- name: Test uv sync with all extras (Python < 3.13)
if: "!startsWith(matrix.python-version, '3.13.')"
run: |
uv sync --group dev --all-extras --no-extra krisp
- name: Test uv sync without PyTorch extras (Python 3.13+)
if: startsWith(matrix.python-version, '3.13.')
run: |
uv sync --group dev --all-extras \
--no-extra krisp \
--no-extra ultravox \
--no-extra local-smart-turn \
--no-extra moondream \
--no-extra mlx-whisper
- name: Verify installation
run: |
uv run python --version
uv run python -c "import pipecat; print('✅ Pipecat imports successfully')"

View File

@@ -1,51 +0,0 @@
name: Sync Quickstart to pipecat-quickstart repo
on:
push:
branches: [main]
paths:
- 'examples/quickstart/**'
workflow_dispatch: # Manual trigger
jobs:
sync-quickstart:
runs-on: ubuntu-latest
steps:
- name: Checkout main repo
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Checkout quickstart repo
uses: actions/checkout@v4
with:
repository: pipecat-ai/pipecat-quickstart
token: ${{ secrets.QUICKSTART_SYNC_TOKEN }}
path: quickstart-repo
- name: Sync files (excluding uv.lock and README.md)
run: |
# Copy all files except uv.lock and README.md
find examples/quickstart -type f \
-not -name "README.md" \
-not -name "uv.lock" \
-exec cp {} quickstart-repo/ \;
- name: Commit and push changes
run: |
cd quickstart-repo
git config user.name "GitHub Action"
git config user.email "action@github.com"
git add .
# Only commit if there are changes
if ! git diff --staged --quiet; then
git commit -m "Sync from pipecat main repo
Updated files from examples/quickstart/
Commit: ${{ github.sha }}
"
git push
else
echo "No changes to sync"
fi

View File

@@ -1,4 +1,4 @@
name: tests
name: test
on:
workflow_dispatch:
@@ -20,25 +20,39 @@ jobs:
name: "Unit and Integration Tests"
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
- uses: actions/checkout@v4
- name: Set up Python
run: uv python install 3.12
id: setup_python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Cache virtual environment
uses: actions/cache@v3
with:
# TODO: we are hashing requirements.txt but that doesn't contain all
# our dependencies pinned.
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('requirements.txt') }}
path: .venv
- name: Install system packages
run: sudo apt-get install -y portaudio19-dev
- name: Setup virtual environment
run: |
sudo apt-get install -y portaudio19-dev
- name: Install dependencies
python -m venv .venv
- name: Install basic Python dependencies
run: |
uv sync --group dev --extra anthropic --extra aws --extra google --extra langchain
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Build project
run: |
source .venv/bin/activate
python -m build
- name: Install project and other Python dependencies
run: |
source .venv/bin/activate
pip install --editable .
- name: Test with pytest
run: |
uv run pytest
source .venv/bin/activate
pip install pytest
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests

28
.gitignore vendored
View File

@@ -3,11 +3,9 @@ env/
__pycache__/
*~
venv
.venv
/.idea
#*#
# Distribution / Packaging
# Distribution / packaging
.Python
build/
develop-eggs/
@@ -28,27 +26,3 @@ share/python-wheels/
MANIFEST
.DS_Store
.env
fly.toml
# Examples
examples/**/node_modules/
examples/**/.expo/
examples/**/dist/
examples/**/npm-debug.*
examples/**/*.jks
examples/**/*.p8
examples/**/*.p12
examples/**/*.key
examples/**/*.mobileprovision
examples/**/*.orig.*
examples/**/web-build/
# macOS
.DS_Store
# Documentation
docs/api/_build/
docs/api/api
# uv
.python-version

View File

@@ -1,8 +0,0 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.1
hooks:
- id: ruff
language_version: python3
args: [--fix]
- id: ruff-format

View File

@@ -1,28 +0,0 @@
version: 2
build:
os: ubuntu-22.04
tools:
python: '3.12'
apt_packages:
- portaudio19-dev
- python3-dev
- libasound2-dev
jobs:
post_install:
- pip install uv
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra ultravox --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
sphinx:
configuration: docs/api/conf.py
fail_on_warning: false
search:
ranking:
api/*: 5
getting-started/*: 4
guides/*: 3
submodules:
include: all
recursive: true

File diff suppressed because it is too large Load Diff

View File

@@ -1,62 +0,0 @@
# Changelog
All notable changes to the **&lt;project name&gt;** SDK will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
Please make sure to add your changes to the appropriate categories:
## [Unreleased]
### Added
<!-- for new functionality -->
- n/a
### Changed
<!-- for changed functionality -->
- n/a
### Deprecated
<!-- for soon-to-be removed functionality -->
- n/a
### Removed
<!-- for removed functionality -->
- n/a
### Fixed
<!-- for fixed bugs -->
- n/a
### Performance
<!-- for performance-relevant changes -->
- n/a
### Security
<!-- for security-relevant changes -->
- n/a
### Other
<!-- for everything else -->
- n/a
## [0.1.0] - YYYY-MM-DD
Initial release.

View File

@@ -1,336 +0,0 @@
# Community Integrations Guide
Pipecat welcomes community-maintained integrations! As our ecosystem grows, we've established a process for any developer to create and maintain their own service integrations while ensuring discoverability for the Pipecat community.
## Overview
**What we support:** Community-maintained integrations that live in separate repositories and are maintained by their authors.
**What we don't do:** The Pipecat team does not code review, test, or maintain community integrations. We provide guidance and list approved integrations for discoverability.
**Why this approach:** This allows the community to move quickly while keeping the Pipecat core team focused on maintaining the framework itself.
## Submitting your Integration
To be listed as an official community integration, follow these steps:
### Step 1: Build Your Integration
Create your integration following the patterns and examples shown in the "Integration Patterns and Examples" section below.
### Step 2: Set Up Your Repository
Your repository must contain these components:
- **Source code** - Complete implementation following Pipecat patterns
- **Foundational example** - Single file example showing basic usage (see [Pipecat examples](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational))
- **README.md** - Must include:
- Introduction and explanation of your integration
- Installation instructions
- Usage instructions with Pipecat Pipeline
- How to run your example
- Pipecat version compatibility (e.g., "Tested with Pipecat v0.0.86")
- Company attribution: If you work for the company providing the service, please mention this in your README. This helps build confidence that the integration will be actively maintained.
- **LICENSE** - Permissive license (BSD-2 like Pipecat, or equivalent open source terms)
- **Code documentation** - Source code with docstrings (we recommend following [Pipecat's docstring conventions](https://github.com/pipecat-ai/pipecat/blob/main/CONTRIBUTING.md#docstring-conventions))
- **Changelog** - Maintain a changelog for version updates
### Step 3: Join Discord
Join our Discord: https://discord.gg/pipecat
### Step 4: Submit for Listing
Submit a pull request to add your integration to our [Community Integrations documentation page](https://docs.pipecat.ai/server/services/community-integrations).
**To submit:**
1. Fork the [Pipecat docs repository](https://github.com/pipecat-ai/docs)
2. Edit the file `server/services/community-integrations.mdx`
3. Add your integration to the appropriate service category table with:
- Service name
- Link to your repository
- Maintainer GitHub username(s)
4. Include a link to your demo video (approx 30-60 seconds) in your PR description showing:
- Core functionality of your integration
- Handling of an interruption (if applicable to service type)
5. Submit your pull request
Once your PR is submitted, post in the `#community-integrations` Discord channel to let us know.
## Integration Patterns and Examples
### STT (Speech-to-Text) Services
#### Websocket-based Services
**Base class:** `STTService`
**Examples:**
- [DeepgramSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py)
- [SpeechmaticsSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/speechmatics/stt.py)
#### File-based Services
**Base class:** `SegmentedSTTService`
**Examples:**
- [RivaSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/riva/stt.py)
- [FalSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/stt.py)
#### Key requirements:
- STT services should push `InterimTranscriptionFrames` and `TranscriptionFrames`
- If confidence values are available, filter for values >50% confidence
### LLM (Large Language Model) Services
#### OpenAI-Compatible Services
**Base class:** `OpenAILLMService`
**Examples:**
- [AzureLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/azure/llm.py)
- [GrokLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/grok/llm.py) - Shows overriding the base class where needed
#### Non-OpenAI Compatible Services
**Requires:** Full implementation
**Examples:**
- [AnthropicLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/anthropic/llm.py)
- [GoogleLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/llm.py)
#### Key requirements:
- **Frame sequence:** Output must follow this frame sequence pattern:
- `LLMFullResponseStartFrame` - Signals the start of an LLM response
- `LLMTextFrame` - Contains LLM content, typically streamed as tokens
- `LLMFullResponseEndFrame` - Signals the end of an LLM response
- **Context aggregation:** Implement context aggregation to collect user and assistant content:
- Aggregators come in pairs with a `user()` instance and `assistant()` instance
- Context must adhere to the `LLMContext` universal format
- Aggregators should handle adding messages, function calls, and images to the context
### TTS (Text-to-Speech) Services
#### AudioContextWordTTSService
**Use for:** Websocket-based services supporting word/timestamp alignment
**Example:**
- [CartesiaTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/cartesia/tts.py)
#### InterruptibleTTSService
**Use for:** Websocket-based services without word/timestamp alignment, requiring disconnection on interruption
**Example:**
- [SarvamTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/sarvam/tts.py)
#### WordTTSService
**Use for:** HTTP-based services supporting word/timestamp alignment
**Example:**
- [ElevenLabsHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/tts.py)
#### TTSService
**Use for:** HTTP-based services without word/timestamp alignment
**Example:**
- [GoogleHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/tts.py)
#### Key requirements:
- For websocket services, use asyncio WebSocket implementation (required for v13+ support)
- Handle idle service timeouts with keepalives
- TTSServices push both audio (`TTSRawAudioFrame`) and text (`TTSTextFrame`) frames
### Telephony Serializers
Pipecat supports telephony provider integration using websocket connections to exchange MediaStreams. These services use a FrameSerializer to serialize and deserialize inputs from the FastAPIWebsocketTransport.
**Examples:**
- [Twilio](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/twilio.py)
- [Telnyx](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/telnyx.py)
#### Key requirements:
- Include hang-up functionality using the provider's native API, ideally using `aiohttp`
- Support DTMF (dual-tone multi-frequency) events if the provider supports them:
- Deserialize DTMF events from the provider's protocol to `InputDTMFFrame`
- Use `KeypadEntry` enum for valid keypad entries (0-9, \*, #, A-D)
- Handle invalid DTMF digits gracefully by returning `None`
### Image Generation Services
**Base class:** `ImageGenService`
**Examples:**
- [FalImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/image.py)
- [GoogleImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/image.py)
#### Key requirements:
- Must implement `run_image_gen` method returning an `AsyncGenerator`
### Vision Services
Vision services process images and provide analysis such as descriptions, object detection, or visual question answering.
**Base class:** `VisionService`
**Example:**
- [MoondreamVisionService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/moondream/vision.py)
#### Key requirements:
- Must implement `run_vision` method that takes an `LLMContext` and returns an `AsyncGenerator[Frame, None]`
- The method processes the latest image in the context and yields frames with analysis results
- Typically yields `TextFrame` objects containing descriptions or answers
## Implementation Guidelines
### Naming Conventions
- **STT:** `VendorSTTService`
- **LLM:** `VendorLLMService`
- **TTS:**
- Websocket: `VendorTTSService`
- HTTP: `VendorHttpTTSService`
- **Image:** `VendorImageGenService`
- **Vision:** `VendorVisionService`
- **Telephony:** `VendorFrameSerializer`
### Metrics Support
Enable metrics in your service:
```python
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as this service supports metrics.
"""
return True
```
### Dynamic Settings Updates
STT, LLM, and TTS services support `ServiceUpdateSettingsFrame` for dynamic configuration changes. The base STTService has an `_update_settings()` method that handles settings, and the private `_settings` `Dict` is used to store settings and provide access to the subclass.
```python
async def set_language(self, language: Language):
"""Set the recognition language and reconnect.
Args:
language: The language to use for speech recognition.
"""
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
```
Note that, in this example, Deepgram requires the websocket connection be disconnected and reconnected to reinitialize the service with the new value. Consider if your service requires reconnection.
### Sample Rate Handling
Sample rates are set via PipelineParams and passed to each frame processor at initialization. The pattern is to _not_ set the sample rate value in the constructor of a given service. Instead, use the `start()` method to initialize sample rates from the frame:
```python
async def start(self, frame: StartFrame):
"""Start the service."""
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
await self._connect()
```
Note that `self.sample_rate` is a `@property` set in the TTSService base class, which provides access to the private sample rate value obtained from the StartFrame.
### Tracing Decorators
Use Pipecat's tracing decorators:
- **STT:** `@traced_stt` - decorate a function that handles `transcript`, `is_final`, `language` as args
- **LLM:** `@traced_llm` - decorate the `_process_context()` method
- **TTS:** `@traced_tts` - decorate the `run_tts()` method
## Best Practices
### Packaging and Distribution
- Use [uv](https://docs.astral.sh/uv/) for packaging (encouraged)
- Consider releasing to PyPI for easier installation
- Follow semantic versioning principles
- Maintain a changelog
### HTTP Communication
For REST-based communication, use aiohttp. Pipecat includes this as a required dependency, so using it prevents adding an additional dependency to your integration.
### Error Handling
- Wrap API calls in appropriate try/catch blocks
- Handle rate limits and network failures gracefully
- Provide meaningful error messages
- When errors occur, raise exceptions AND push `ErrorFrame`s to notify the pipeline:
```python
from pipecat.frames.frames import ErrorFrame
try:
# Your API call
result = await self._make_api_call()
except Exception as e:
# Push error frame to pipeline
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Raise or handle as appropriate
raise
```
### Testing
- Your foundational example serves as a valuable integration-level test
- Unit tests are nice to have. As the Pipecat teams provides better guidance, we will encourage unit testing more
## Disclaimer
Community integrations are community-maintained and not officially supported by the Pipecat team. Users should evaluate these integrations independently. The Pipecat team reserves the right to remove listings that become unmaintained or problematic.
## Staying Up to Date
Pipecat evolves rapidly to support the latest AI technologies and patterns. While we strive to minimize breaking changes, they do occur as the framework matures.
**We strongly recommend:**
- Join our Discord at https://discord.gg/pipecat and monitor the `#announcements` channel for release notifications
- Follow our changelog: https://github.com/pipecat-ai/pipecat/blob/main/CHANGELOG.md
- Test your integration against new Pipecat releases promptly
- Update your README with the last tested Pipecat version
This helps ensure your integration remains compatible and your users have clear expectations about version support.
## Questions?
Join our Discord community at https://discord.gg/pipecat and post in the `#community-integrations` channel for guidance and support.
For additional questions, you can also reach out to us at pipecat-ai@daily.co.

View File

@@ -1,341 +0,0 @@
## Contributing to Pipecat
**Want to add a new service integration?**
We encourage community-maintained integrations! Please see our [Community Integration Guide](COMMUNITY_INTEGRATIONS.md) for the process and requirements.
**Want to contribute to Pipecat core?**
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
2. **Clone the repository**: Clone your forked repository to your local machine.
```bash
git clone https://github.com/your-username/pipecat
```
3. **Create a branch**: For your contribution, create a new branch.
```bash
git checkout -b your-branch-name
```
4. **Make your changes**: Edit or add files as necessary.
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
```bash
git commit -m "Description of your changes"
```
7. **Push your changes**: Push your branch to your forked repository.
```bash
git push origin your-branch-name
```
8. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
> Important: Describe the changes you've made clearly!
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
## Dependency Management
This project uses [uv](https://docs.astral.sh/uv/) for dependency management. The `uv.lock` file is committed to ensure reproducible builds.
### Adding or Updating Dependencies
1. Edit `pyproject.toml` to add/update dependencies
2. Run `uv lock` to update the lockfile with new dependency resolution
3. Run `uv sync` to install the updated dependencies locally
4. Always commit both files together:
```bash
git add pyproject.toml uv.lock
git commit -m "feat: add new dependency for feature X"
```
**Important:** Never manually edit `uv.lock`. It's auto-generated by `uv lock`.
## Code Style and Documentation
### Python Code Style
We use Ruff for code linting and formatting. Please ensure your code passes all linting checks before submitting a PR.
### Docstring Conventions
We follow Google-style docstrings with these specific conventions:
**Regular Classes:**
- Class docstring describes the class purpose and key functionality
- `__init__` method has its own docstring with complete `Args:` section documenting all parameters
- All public methods must have docstrings with `Args:` and `Returns:` sections as appropriate
**Dataclasses:**
- Class docstring describes the purpose and documents all fields in a `Parameters:` section
- No `__init__` docstring (auto-generated)
**Properties:**
- Must have docstrings with `Returns:` section
**Abstract Methods:**
- Must have docstrings explaining what subclasses should implement
**`__init__.py` Files:**
- **Skip docstrings** for pure import/re-export modules
- **Add brief docstrings** for top-level packages or those with initialization logic
**Enums:**
- Class docstring describes the enumeration purpose
- Use `Parameters:` section to document each enum value and its meaning
- No `__init__` docstring (Enums don't have custom constructors)
**Code Examples in Docstrings:**
- Use `Examples:` as a section header for multiple examples
- Use descriptive text followed by double colons (`::`) for each example
- **Always include a blank line after the `::"`**
- Indent all code consistently within each block
- Separate multiple examples with blank lines for readability
**Lists and Bullets in Docstrings:**
- Use dashes (`-`) for bullet points, not asterisks (`*`)
- **Add a blank line before bullet lists** when they follow a colon
- Use section headers like "Supported features:" or "Behavior:" before lists
- For complex nested information, consider using paragraph format instead
**Deprecations:**
- Use `warnings.warn()` in code for runtime deprecation warnings
- Add `.. deprecated::` directive in docstrings for documentation visibility
- Include version information and describe current status
- Describe parameters in present tense, use directive to indicate deprecation status
#### Examples:
```python
# Regular class
class MyService(BaseService):
"""Description of what the service does.
Provides detailed explanation of the service's functionality,
key features, and usage patterns.
Supported features:
- Feature one with detailed explanation
- Feature two with additional context
- Feature three for advanced use cases
"""
def __init__(self, param1: str, old_param: str = None, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
old_param: Controls legacy behavior.
.. deprecated:: 1.2.0
This parameter no longer has any effect and will be removed in version 2.0.
**kwargs: Additional arguments passed to parent.
"""
if old_param is not None:
import warnings
warnings.warn(
"Parameter 'old_param' is deprecated and will be removed in version 2.0.",
DeprecationWarning,
)
super().__init__(**kwargs)
@property
def sample_rate(self) -> int:
"""Get the current sample rate.
Returns:
The sample rate in Hz.
"""
return self._sample_rate
async def process_data(self, data: str) -> bool:
"""Process the provided data.
Args:
data: The data to process.
Returns:
True if processing succeeded.
"""
pass
# Dataclass with code examples
@dataclass
class MessageFrame:
"""Frame containing messages in OpenAI format.
Supports both simple and content list message formats.
Example::
[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
Parameters:
messages: List of messages in OpenAI format.
"""
messages: List[dict]
# Enum class
class Status(Enum):
"""Status codes for processing operations.
Parameters:
PENDING: Operation is queued but not started.
RUNNING: Operation is currently in progress.
COMPLETED: Operation finished successfully.
FAILED: Operation encountered an error.
"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
```
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
- Demonstrating empathy and kindness toward other people
- Being respectful of differing opinions, viewpoints, and experiences
- Giving and gracefully accepting constructive feedback
- Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
- Focusing on what is best not just for us as individuals, but for the overall
community
Examples of unacceptable behavior include:
- The use of sexualized language or imagery, and sexual attention or advances of
any kind
- Trolling, insulting or derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or email address,
without their explicit permission
- Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official email address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at pipecat-ai@daily.co.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series of
actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or permanent
ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within the
community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
[https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

View File

@@ -1,6 +1,6 @@
BSD 2-Clause License
Copyright (c) 20242025, Daily
Copyright (c) 2024, Daily
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

View File

@@ -1,4 +0,0 @@
prune docs
prune examples
prune scripts
prune tests

263
README.md
View File

@@ -1,201 +1,158 @@
<h1><div align="center">
<img alt="pipecat" width="300px" height="auto" src="https://raw.githubusercontent.com/pipecat-ai/pipecat/main/pipecat.png">
</div></h1>
# dailyai — an open source framework for real-time, multi-modal, conversational AI applications
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![codecov](https://codecov.io/gh/pipecat-ai/pipecat/graph/badge.svg?token=LNVUIVO4Y9)](https://codecov.io/gh/pipecat-ai/pipecat) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/pipecat-ai/pipecat)
[![](https://getmanta.ai/api/badges?text=Manta%20Graph&link=manta)](https://getmanta.ai/pipecat)
Build things like this:
# 🎙️ Pipecat: Real-Time Voice & Multimodal AI Agents
[![AI-powered voice patient intake for healthcare](https://img.youtube.com/vi/lDevgsp9vn0/0.jpg)](https://www.youtube.com/watch?v=lDevgsp9vn0)
**Pipecat** is an open-source Python framework for building real-time voice and multimodal conversational agents. Orchestrate audio and video, AI services, different transports, and conversation pipelines effortlessly—so you can focus on what makes your agent unique.
> Want to dive right in? Try the [quickstart](https://docs.pipecat.ai/getting-started/quickstart).
## 🚀 What You Can Build
- **Voice Assistants** natural, streaming conversations with AI
- **AI Companions** coaches, meeting assistants, characters
- **Multimodal Interfaces** voice, video, images, and more
- **Interactive Storytelling** creative tools with generative media
- **Business Agents** customer intake, support bots, guided flows
- **Complex Dialog Systems** design logic with structured conversations
**`dailyai` started as a toolkit for implementing generative AI voice bots.** Things like personal coaches, meeting assistants, story-telling toys for kids, customer support bots, and snarky social companions.
## 🧠 Why Pipecat?
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling
- **Pluggable**: Supports many AI services and tools
- **Composable Pipelines**: Build complex behavior from modular components
- **Real-Time**: Ultra-low latency interaction with different transports (e.g. WebSockets or WebRTC)
In 2023 a *lot* of us got excited about the possibility of having open-ended conversations with LLMs. It became clear pretty quickly that we were all solving the same [low-level problems](https://www.daily.co/blog/how-to-talk-to-an-llm-with-your-voice/):
- low-latency, reliable audio transport
- echo cancellation
- phrase endpointing (knowing when the bot should respond to human speech)
- interruptibility
- writing clean code to stream data through "pipelines" of speech-to-text, LLM inference, and text-to-speech models
## 🌐 Pipecat Ecosystem
As our applications expanded to include additional things like image generation, function calling, and vision models, we started to think about what a complete framework for these kinds of apps could look like.
### 📱 Client SDKs
Today, `dailyai` is:
Building client applications? You can connect to Pipecat from any platform using our official SDKs:
1. a set of code building blocks for interacting with generative AI services and creating low-latency, interruptible data pipelines that use multiple services
2. transport services that moves audio, video, and events across the Internet
3. implementations of specific generative AI services
<a href="https://docs.pipecat.ai/client/js/introduction">JavaScript</a> | <a href="https://docs.pipecat.ai/client/react/introduction">React</a> | <a href="https://docs.pipecat.ai/client/react-native/introduction">React Native</a> |
<a href="https://docs.pipecat.ai/client/ios/introduction">Swift</a> | <a href="https://docs.pipecat.ai/client/android/introduction">Kotlin</a> | <a href="https://docs.pipecat.ai/client/c++/introduction">C++</a> | <a href="https://github.com/pipecat-ai/pipecat-esp32">ESP32</a>
Currently implemented services:
- Speech-to-text
- Deepgram
- Whisper
- LLMs
- Azure
- OpenAI
- Image generation
- Azure
- Fal
- OpenAI
- Text-to-speech
- Azure
- Deepgram
- ElevenLabs
- Transport
- Daily
- Local (in progress, intended as a quick start example service)
### 🧭 Structured conversations
If you'd like to [implement a service]((https://github.com/daily-co/daily-ai-sdk/tree/main/src/dailyai/services)), we welcome PRs! Our goal is to support lots of services in all of the above categories, plus new categories (like real-time video) as they emerge.
Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
## Getting started
### 🪄 Beautiful UIs
Today, the easiest way to get started with `dailyai` is to use [Daily](https://www.daily.co/) as your transport service. This toolkit started life as an internal SDK at Daily and millions of minutes of AI conversation have been served using it and its earlier prototype incarnations. (The [transport base class](https://github.com/daily-co/daily-ai-sdk/blob/main/src/dailyai/services/base_transport_service.py) is easy to extend, though, so feel free to submit PRs if you'd like to implement another transport service.)
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
```
# install the module
pip install dailyai
### 🛠️ Create and deploy projects
# set up an .env file with API keys
cp dot-env.template .env
```
Create a new project in under a minute with the [Pipecat CLI](https://github.com/pipecat-ai/pipecat-cli). Then use the CLI to monitor and deploy your agent to production.
## Code examples
### 🔍 Debugging
There are two directories of examples:
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
- [foundational](https://github.com/daily-co/daily-ai-sdk/tree/main/examples/foundational) — demos that build on each other, introducing one or two concepts at a time
- [starter apps](https://github.com/daily-co/daily-ai-sdk/tree/main/examples/starter-apps) — complete applications that you can use as starting points for development
### 🖥️ Terminal
To run the example below you need to sign up for a [free Daily account](https://dashboard.daily.co/u/signup) and create a Daily room (so you can hear the LLM talking). After that, join the room's URL directly from a browser tab and run:
Love terminal applications? Check out [Tail](https://github.com/pipecat-ai/tail), a terminal dashboard for Pipecat.
```
python examples/foundational/02-llm-say-one-thing.py
```
### 📺️ Pipecat TV Channel
## Hacking on the framework itself
Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.youtube.com/playlist?list=PLzU2zoMTQIHjqC3v4q2XVSR3hGSzwKFwH) channel.
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
## 🎬 See it in action
```
python3 -m venv env
source env/bin/activate
```
<p float="left">
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/simple-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/12-describe-video.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/assets/moondream.png" width="400" /></a>
</p>
From the root of this repo, run the following:
## 🧩 Available services
```
pip install -r requirements.txt
python -m build
```
| Category | Services |
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Sarvam](https://docs.pipecat.ai/server/services/stt/sarvam), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [Speechmatics](https://docs.pipecat.ai/server/services/tts/speechmatics), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
| Video | [HeyGen](https://docs.pipecat.ai/server/services/video/heygen), [Tavus](https://docs.pipecat.ai/server/services/video/tavus), [Simli](https://docs.pipecat.ai/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/server/services/memory/mem0) |
| Vision & Image | [fal](https://docs.pipecat.ai/server/services/image-generation/fal), [Google Imagen](https://docs.pipecat.ai/server/services/image-generation/fal), [Moondream](https://docs.pipecat.ai/server/services/vision/moondream) |
| Audio Processing | [Silero VAD](https://docs.pipecat.ai/server/utilities/audio/silero-vad-analyzer), [Krisp](https://docs.pipecat.ai/server/utilities/audio/krisp-filter), [Koala](https://docs.pipecat.ai/server/utilities/audio/koala-filter), [ai-coustics](https://docs.pipecat.ai/server/utilities/audio/aic-filter) |
| Analytics & Metrics | [OpenTelemetry](https://docs.pipecat.ai/server/utilities/opentelemetry), [Sentry](https://docs.pipecat.ai/server/services/analytics/sentry) |
This builds the package. To use the package locally (eg to run sample files), run
📚 [View full services documentation →](https://docs.pipecat.ai/server/services/supported-services)
```
pip install --editable .
```
## ⚡ Getting started
If you want to use this package from another directory, you can run:
You can get started with Pipecat running on your local machine, then move your agent processes to the cloud when you're ready.
1. Install uv
```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```
> **Need help?** Refer to the [uv install documentation](https://docs.astral.sh/uv/getting-started/installation/).
2. Install the module
```bash
# For new projects
uv init my-pipecat-app
cd my-pipecat-app
uv add pipecat-ai
# Or for existing projects
uv add pipecat-ai
```
3. Set up your environment
```bash
cp env.example .env
```
4. To keep things lightweight, only the core framework is included by default. If you need support for third-party AI services, you can add the necessary dependencies with:
```bash
uv add "pipecat-ai[option,...]"
```
> **Using pip?** You can still use `pip install pipecat-ai` and `pip install "pipecat-ai[option,...]"` to get set up.
## 🧪 Code examples
- [Foundational](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational) — small snippets that build on each other, introducing one or two concepts at a time
- [Example apps](https://github.com/pipecat-ai/pipecat-examples) — complete applications that you can use as starting points for development
## 🛠️ Contributing to the framework
### Prerequisites
**Minimum Python Version:** 3.10
**Recommended Python Version:** 3.12
### Setup Steps
1. Clone the repository and navigate to it:
```bash
git clone https://github.com/pipecat-ai/pipecat.git
cd pipecat
```
2. Install development and testing dependencies:
```bash
uv sync --group dev --all-extras \
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
--no-extra ultravox # (ultravox not fully supported on macOS)
```
3. Install the git pre-commit hooks:
```bash
uv run pre-commit install
```
> **Note**: Some extras (local, gstreamer) require system dependencies. See documentation if you encounter build errors.
```
pip install path_to_this_repo
```
### Running tests
To run all tests, from the root directory:
To run tests you need to install `pytest`:
```bash
uv run pytest
```
pip install pytest
```
Run a specific test suite:
Then, from the root directory, run:
```bash
uv run pytest tests/test_name.py
```
pytest --doctest-modules --ignore-glob="*to_be_updated*" src tests
```
## 🤝 Contributing
## Setting up your editor
We welcome contributions from the community! Whether you're fixing bugs, improving documentation, or adding new features, here's how you can help:
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting.
- **Found a bug?** Open an [issue](https://github.com/pipecat-ai/pipecat/issues)
- **Have a feature idea?** Start a [discussion](https://discord.gg/pipecat)
- **Want to contribute code?** Check our [CONTRIBUTING.md](CONTRIBUTING.md) guide
- **Documentation improvements?** [Docs](https://github.com/pipecat-ai/docs) PRs are always welcome
### Emacs
Before submitting a pull request, please check existing issues and PRs to avoid duplicates.
You can use [use-package](https://github.com/jwiegley/use-package) to install [py-autopep8](https://codeberg.org/ideasman42/emacs-py-autopep8) package and configure `autopep8` arguments:
We aim to review all contributions promptly and provide constructive feedback to help get your changes merged.
```elisp
(use-package py-autopep8
:ensure t
:defer t
:hook ((python-mode . py-autopep8-mode))
:config
(setq py-autopep8-options '("-a" "-a", "--max-line-length=100")))
```
## 🛟 Getting help
`autopep8` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
➡️ [Join our Discord](https://discord.gg/pipecat)
```elisp
(use-package pyvenv-auto
:ensure t
:defer t
:hook ((python-mode . pyvenv-auto-run)))
➡️ [Read the docs](https://docs.pipecat.ai)
```
➡️ [Reach us on X](https://x.com/pipecat_ai)
### Visual Studio Code
Install the
[autopep8](https://marketplace.visualstudio.com/items?itemName=ms-python.autopep8) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `autopep8` arguments:
```json
"[python]": {
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true
},
"autopep8.args": [
"-a",
"-a",
"--max-line-length=100"
],
```

View File

@@ -1,5 +0,0 @@
# Security Policy
## Reporting a Vulnerability
Please email `disclosures@daily.co`.

View File

@@ -1,11 +0,0 @@
coverage:
range: 50..90 # coverage lower than 50 is red, higher than 90 green, between color code
status:
project:
default:
target: auto # auto % coverage target
threshold: 5% # allow for 5% reduction of coverage without failing
# do not run coverage on patch nor changes
patch: false

17
docs/README.md Normal file
View File

@@ -0,0 +1,17 @@
# Daily AI SDK Docs
## [Architecture Overview](architecture.md)
Learn about the thinking behind the SDK's design.
## [A Frame's Progress](frame-progress.md)
See how a Frame is processed through a Transport, a Pipeline, and a series of Frame Processors.
## [Example Code](examples/)
The repo includes several example apps in the `examples` directory. The docs explain how they work.
## [API Reference](api/)
Complete documentation of the available classes and methods in the SDK.

View File

@@ -1,103 +0,0 @@
# TurnAwareTranscriptProcessor Example
## Overview
The `TurnAwareTranscriptProcessor` combines user and assistant transcript tracking with turn boundary detection. It correctly handles interruptions by only capturing what was actually spoken.
## Basic Usage
```python
from pipecat.processors.transcript_processor import TurnAwareTranscriptProcessor
# Create the processor
turn_processor = TurnAwareTranscriptProcessor()
# Register event handlers
@turn_processor.event_handler("on_turn_started")
async def handle_turn_started(processor, turn_number):
print(f"Turn {turn_number} started")
@turn_processor.event_handler("on_turn_ended")
async def handle_turn_ended(processor, turn_number, user_text, assistant_text, was_interrupted):
print(f"\nTurn {turn_number} ended:")
print(f" User said: {user_text}")
print(f" Assistant said: {assistant_text}")
print(f" Was interrupted: {was_interrupted}")
@turn_processor.event_handler("on_transcript_update")
async def handle_transcript_update(processor, frame):
for msg in frame.messages:
print(f"[{msg.role}]: {msg.content}")
# Add to pipeline
pipeline = Pipeline([
transport.input(),
stt,
turn_processor, # Process transcripts and track turns
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
```
## Features
1. **Turn Boundary Detection**: Automatically detects when turns start and end based on user and bot speaking patterns
2. **Interruption Handling**: Correctly captures only what was actually spoken when interruptions occur
3. **Real-time Transcripts**: Emits transcript messages for both user and assistant speech
4. **Turn Events**: Provides start/end events with accumulated transcripts for each turn
## Events
### on_turn_started
Emitted when a new turn begins (user starts speaking).
**Handler signature**: `async def handler(processor, turn_number)`
### on_turn_ended
Emitted when a turn ends with accumulated transcripts.
**Handler signature**: `async def handler(processor, turn_number, user_transcript, assistant_transcript, was_interrupted)`
### on_transcript_update
Inherited from `BaseTranscriptProcessor`, emitted for individual transcript messages.
**Handler signature**: `async def handler(processor, frame)`
## Turn Logic
- Turns start when the user begins speaking (`UserStartedSpeakingFrame`)
- Turns end when:
- The user starts speaking again (previous turn ends, new turn starts)
- The bot is interrupted (`InterruptionFrame`)
- The pipeline ends (`EndFrame`/`CancelFrame`)
## Integration with OpenTelemetry
You can use turn events to enrich OpenTelemetry spans:
```python
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
turn_tracker = TurnTrackingObserver()
turn_tracer = TurnTraceObserver(turn_tracker)
turn_processor = TurnAwareTranscriptProcessor()
@turn_processor.event_handler("on_turn_ended")
async def add_transcripts_to_span(processor, turn_number, user_text, assistant_text, interrupted):
# Get current span and add transcript data
from opentelemetry import trace
current_span = trace.get_current_span()
if current_span:
current_span.set_attribute("turn.user_text", user_text)
current_span.set_attribute("turn.assistant_text", assistant_text)
```
## Notes
- The processor handles async frame processing correctly by delaying turn end until frames are processed
- Works with word-level timestamps from TTS services like Cartesia
- Accumulates both user (`TranscriptionFrame`) and assistant (`TTSTextFrame`) speech
- Emits individual transcript messages in addition to turn-level aggregation

View File

@@ -1,20 +0,0 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

View File

@@ -1,109 +0,0 @@
# Pipecat Documentation
This directory contains the source files for auto-generating Pipecat's server API reference documentation.
## Setup
1. Install documentation dependencies:
```bash
pip install -r requirements.txt
```
2. Make the build scripts executable:
```bash
chmod +x build-docs.sh rtd-test.py
```
## Building Documentation
From this directory, you can build the documentation in several ways:
### Local Build
```bash
# Using the build script (automatically opens docs when done)
./build-docs.sh
# Or directly with sphinx-build
sphinx-build -b html . _build/html -W --keep-going
```
### ReadTheDocs Test Build
To test the documentation build process exactly as it would run on ReadTheDocs:
```bash
./rtd-test.py
```
This script:
- Creates a fresh virtual environment
- Installs all dependencies as specified in requirements files
- Handles conflicting dependencies (like grpcio versions for Riva and PlayHT)
- Builds the documentation in an isolated environment
- Provides detailed logging of the build process
Use this script to verify your documentation will build correctly on ReadTheDocs before pushing changes.
## Viewing Documentation
The built documentation will be available at `_build/html/index.html`. To open:
```bash
# On MacOS
open _build/html/index.html
# On Linux
xdg-open _build/html/index.html
# On Windows
start _build/html/index.html
```
## Directory Structure
```
.
├── api/ # Auto-generated API documentation
├── _build/ # Built documentation
├── _static/ # Static files (images, css, etc.)
├── conf.py # Sphinx configuration
├── index.rst # Main documentation entry point
├── requirements-base.txt # Base documentation dependencies
├── requirements-riva.txt # Riva-specific dependencies
├── requirements-playht.txt # PlayHT-specific dependencies
├── build-docs.sh # Local build script
└── rtd-test.py # ReadTheDocs test build script
```
## Notes
- Documentation is auto-generated from Python docstrings
- Service modules are automatically detected and included
- The build process matches our ReadTheDocs configuration
- Warnings are treated as errors (-W flag) to maintain consistency
- The --keep-going flag ensures all errors are reported
- Dependencies are split into multiple requirements files to handle version conflicts
## Troubleshooting
If you encounter missing service modules:
1. Verify the service is installed with its extras: `pip install pipecat-ai[service-name]`
2. Check the build logs for import errors
3. Ensure the service module is properly initialized in the package
4. Run `./rtd-test.py` to test in an isolated environment matching ReadTheDocs
For dependency conflicts:
1. Check the requirements files for version specifications
2. Use `rtd-test.py` to verify dependency resolution
3. Consider adding service-specific requirements files if needed
For more information:
- [ReadTheDocs Configuration](.readthedocs.yaml)
- [Sphinx Documentation](https://www.sphinx-doc.org/)

View File

@@ -1,27 +0,0 @@
#!/bin/bash
# Build docs using uv
echo "Installing dependencies with uv..."
uv sync --group docs --all-extras --no-extra krisp --no-extra gstreamer --no-extra ultravox --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
# Check if sphinx-build is available
if ! uv run sphinx-build --version &> /dev/null; then
echo "Error: sphinx-build is not available" >&2
exit 1
fi
# Clean previous build
rm -rf _build
echo "Building documentation..."
# Build docs matching ReadTheDocs configuration
uv run sphinx-build -b html -d _build/doctrees . _build/html -W --keep-going
if [ $? -eq 0 ]; then
echo "Documentation built successfully!"
# Open docs (MacOS)
open _build/html/index.html
else
echo "Documentation build failed!" >&2
exit 1
fi

View File

@@ -1,211 +0,0 @@
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("sphinx-build")
# Add source directory to path
docs_dir = Path(__file__).parent
project_root = docs_dir.parent.parent
sys.path.insert(0, str(project_root / "src"))
# Project information
project = "pipecat-ai"
current_year = datetime.now().year
copyright = f"2024-{current_year}, Daily" if current_year > 2024 else "2024, Daily"
author = "Daily"
# General configuration
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
"sphinx.ext.viewcode",
"sphinx.ext.intersphinx",
]
suppress_warnings = [
"autodoc.mocked_object",
"toc.not_included",
]
# Napoleon settings
napoleon_google_docstring = True
napoleon_include_init_with_doc = True
# AutoDoc settings
autodoc_default_options = {
"members": True,
"member-order": "bysource",
"undoc-members": False,
"exclude-members": "__weakref__,model_config",
"show-inheritance": True,
}
# Mock imports for optional dependencies
autodoc_mock_imports = [
# Krisp - has build issues on some platforms
"pipecat_ai_krisp",
"krisp",
"krisp_audio",
# System-specific GUI libraries
"_tkinter",
"tkinter",
# Platform-specific audio libraries (if needed)
"gi",
"gi.require_version",
"gi.repository",
# OpenCV - sometimes has import issues during docs build
"cv2",
# Heavy ML packages excluded from ReadTheDocs
# ultravox dependencies
"vllm",
"vllm.engine.arg_utils",
# local-smart-turn dependencies
"coremltools",
"coremltools.models",
"coremltools.models.MLModel",
"torch",
"torch.nn",
"torch.nn.functional",
"torchaudio",
# moondream dependencies
"transformers",
"transformers.AutoTokenizer",
"transformers.AutoFeatureExtractor",
"AutoFeatureExtractor",
"timm",
"einops",
"intel_extension_for_pytorch",
"huggingface_hub",
# riva dependencies
"riva",
"riva.client",
"riva.client.Auth",
"riva.client.ASRService",
"riva.client.StreamingRecognitionConfig",
"riva.client.RecognitionConfig",
"riva.client.AudioEncoding",
"riva.client.proto.riva_tts_pb2",
"riva.client.SpeechSynthesisService",
# MLX dependencies (Apple Silicon specific)
"mlx",
"mlx_whisper", # Note: might need underscore format too
]
# HTML output settings
html_theme = "sphinx_rtd_theme"
html_static_path = ["_static"] if os.path.exists("_static") else []
autodoc_typehints = "signature" # Show type hints in the signature only, not in the docstring
html_show_sphinx = False
def import_core_modules():
"""Import core pipecat modules for autodoc to discover."""
core_modules = [
"pipecat",
"pipecat.frames",
"pipecat.pipeline",
"pipecat.processors",
"pipecat.services",
"pipecat.transports",
"pipecat.audio",
"pipecat.adapters",
"pipecat.clocks",
"pipecat.metrics",
"pipecat.observers",
"pipecat.runner",
"pipecat.serializers",
"pipecat.sync",
"pipecat.transcriptions",
"pipecat.utils",
]
for module_name in core_modules:
try:
__import__(module_name)
logger.info(f"Successfully imported {module_name}")
except ImportError as e:
logger.warning(f"Failed to import {module_name}: {e}")
def clean_title(title: str) -> str:
"""Automatically clean module titles."""
# Remove everything after space (like 'module', 'processor', etc.)
title = title.split(" ")[0]
# Get the last part of the dot-separated path
parts = title.split(".")
title = parts[-1]
return title
def setup(app):
"""Generate API documentation during Sphinx build."""
from sphinx.ext.apidoc import main
docs_dir = Path(__file__).parent
project_root = docs_dir.parent.parent
output_dir = str(docs_dir / "api")
source_dir = str(project_root / "src" / "pipecat")
# Clean existing files
if Path(output_dir).exists():
import shutil
shutil.rmtree(output_dir)
logger.info(f"Cleaned existing documentation in {output_dir}")
logger.info(f"Generating API documentation...")
logger.info(f"Output directory: {output_dir}")
logger.info(f"Source directory: {source_dir}")
excludes = [
str(project_root / "src/pipecat/pipeline/to_be_updated"),
str(project_root / "src/pipecat/examples"),
str(project_root / "src/pipecat/tests"),
"**/test_*.py",
"**/tests/*.py",
]
try:
main(
[
"-f", # Force overwriting
"-e", # Don't generate empty files
"-M", # Put module documentation before submodule documentation
"--no-toc", # Don't create a table of contents file
"--separate", # Put documentation for each module in its own page
"--module-first", # Module documentation before submodule documentation
"--implicit-namespaces", # Added: Handle implicit namespace packages
"-o",
output_dir,
source_dir,
]
+ excludes
)
logger.info("API documentation generated successfully!")
# Process generated RST files to update titles
for rst_file in Path(output_dir).glob("**/*.rst"): # Changed to recursive glob
content = rst_file.read_text()
lines = content.split("\n")
# Find and clean up the title
if lines and "=" in lines[1]: # Title is typically the first line
old_title = lines[0]
new_title = clean_title(old_title)
content = content.replace(old_title, new_title)
rst_file.write_text(content)
logger.info(f"Updated title: {old_title} -> {new_title}")
except Exception as e:
logger.error(f"Error generating API documentation: {e}", exc_info=True)
import_core_modules()

View File

@@ -1,36 +0,0 @@
Pipecat API Reference
=====================
Welcome to the Pipecat API reference.
Use the navigation on the left to browse modules, or search using the search box.
**New to Pipecat?** Check out the `main documentation <https://docs.pipecat.ai>`_ for tutorials, guides, and client SDK information.
Quick Links
-----------
* `GitHub Repository <https://github.com/pipecat-ai/pipecat>`_
* `Join our Community <https://discord.gg/pipecat>`_
.. toctree::
:maxdepth: 2
:caption: API Reference
:hidden:
Adapters <api/pipecat.adapters>
Audio <api/pipecat.audio>
Clocks <api/pipecat.clocks>
Extensions <api/pipecat.extensions>
Frames <api/pipecat.frames>
Metrics <api/pipecat.metrics>
Observers <api/pipecat.observers>
Pipeline <api/pipecat.pipeline>
Processors <api/pipecat.processors>
Runner <api/pipecat.runner>
Serializers <api/pipecat.serializers>
Services <api/pipecat.services>
Sync <api/pipecat.sync>
Transcriptions <api/pipecat.transcriptions>
Transports <api/pipecat.transports>
Utils <api/pipecat.utils>

View File

@@ -1,35 +0,0 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@@ -1,38 +0,0 @@
#!/bin/bash
set -e
# Configuration
DOCS_DIR=$(pwd)
PROJECT_ROOT=$(cd ../../ && pwd)
TEST_DIR="/tmp/rtd-test-$(date +%Y%m%d_%H%M%S)"
echo "Creating test directory: $TEST_DIR"
mkdir -p "$TEST_DIR"
cd "$TEST_DIR"
# Create virtual environment
python -m venv venv
source venv/bin/activate
echo "Installing build dependencies..."
pip install --upgrade pip wheel setuptools
echo "Installing documentation dependencies..."
pip install -r "$DOCS_DIR/requirements.txt"
echo "Building documentation..."
cd "$DOCS_DIR"
sphinx-build -b html . "_build/html"
echo "Build complete. Check _build/html directory for output."
# Print summary
echo -e "\n=== Build Summary ==="
echo "Documentation: $DOCS_DIR/_build/html"
echo "Test environment: $TEST_DIR"
echo -e "\nTo view the documentation:"
echo "open $DOCS_DIR/_build/html/index.html"
# Print installed packages for verification
echo -e "\n=== Installed Packages ==="
pip freeze | grep -E "sphinx|pipecat"

17
docs/architecture.md Normal file
View File

@@ -0,0 +1,17 @@
# Daily AI SDK Architecture Guide
## Frames
Frames can represent discrete chunks of data, for instance a chunk of text, a chunk of audio, or an image. They can also be used to as control flow, for instance a frame that indicates that there is no more data available, or that a user started or stopped talking. They can also represent more complex data structures, such as a message array used for an LLM completion.
## FrameProcessors
Frame processors operate on frames. Every frame processor implements a `process_frame` method that consumes one frame and produces zero or more frames. Frame processors can do simple transforms, such as concatenating text fragments into sentences, or they can treat frames as input for an AI Service, and emit chat completions based on message arrays or transform text into audio or images.
## Pipelines
Pipelines are lists of frame processors that read from a source queue and send the processed frames to a sink queue. A very simple pipeline might chain an LLM frame processor to a text-to-speech frame processor, with a transport's send queue as its sync. Placing LLM message frames on the pipeline's source queue will cause the LLM's response to be spoken. See example #2 for an implementation of this.
## Transports
Transports provide a receive queue, which is input from "the outside world", and a sink queue, which is data that will be sent "to the outside world". The `LocalTransportService` does this with the local camera, mic, display and speaker. The `DailyTransportService` does this with a WebRTC session joined to a Daily.co room.

View File

@@ -0,0 +1,119 @@
# 01: Say One Thing
_video here - youtube?_
This example uses a text-to-speech (TTS) service to say one predefined sentence. But first, a quick overview of the general structure of these examples.
## Running the demos
All of the demos have something like this at the bottom of the file:
```python
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))
```
### `configure()`
The `configure()` function comes from `examples/foundational/support/runner.py`, and it allows you to configure the examples from the command line directly, or using environment variables:
```bash
python 01-say-one-thing.py -u https://YOUR_DOMAIN.daily.co/YOUR_ROOM -k YOUR_API_KEY
# or
DAILY_ROOM_URL=https://YOUR_DOMAIN.daily.co/YOUR_ROOM DAILY_API_KEY=YOUR_API_KEY python 01-say-one-thing.py
# or set DAILY_ROOM_URL and DAILY_API_KEY in a .env file
python 01-say-one-thing.py
```
You'll need a Daily account to run these demos. You can sign up for free at [daily.co](https://daily.co). Once you've signed up you can create a room from the [Dashboard](https://dashboard.daily.co/rooms), and grab [your API key](https://dashboard.daily.co/developers) while you're there.
Some functionality (such as transcription) requires the bot to have owner privileges in the room. `runner.py` uses the Daily REST API to create a meeting token with owner privileges. You can learn more about meeting tokens in the [Daily docs](https://docs.daily.co/reference/rest-api/meeting-tokens).
### `asyncio.run()`
The AI SDK makes heavy use of Python's `asyncio` module. [This is a reasonable intro to the topic](https://builtin.com/data-science/asyncio) if you haven't worked with `asyncio` and coroutines before.
You can learn a bit more about the specifics of how the Daily AI SDK uses coroutines in the [Architecture Guide](../architecture.md).
## The `main()` function
All of the examples have a `main()` function with a similar structure:
- Configure the transport
- Configure the AI service(s) used in the demo
- Configure any event listeners
- Define a processing pipeline
- Run the example's coroutine(s)
### Configuring the transport
The first section of the `main()` function configures the transport object:
```python
meeting_duration_minutes = 5
transport = DailyTransportService(
room_url,
None,
"Say One Thing",
meeting_duration_minutes,
)
transport.mic_enabled = True
```
The [Architecture Guide](../architecture.md) explains the transport object in more detail. In this case, we're configuring a Daily transport object and enabling the virtual microphone, so our bot can play audio.
### Configuring the services
As described in the [Architecture Guide](../architecture.md), 'a 'Service' is a class that processes 'Frames' as part of a 'Pipeline'. In this demo app, we'll only need one service: a text-to-speech generator. We can create an instance of the `ElevenLabsTTSService` class with this line of code:
```python
tts = ElevenLabsTTSService(aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
```
You'll need to make sure and set those environment variables somewhere. The easiest way to do that is to copy the `example.env` file in the repo and rename it to `.env`, and then add your credentials to that file. `runner.py` loads the `python-dotenv` module and initializes it, making the values in that file available in the environment.
### Configuring event listeners
This part isn't strictly necessary for an app like this. You could include the contents of the `on_participant_joined` function directly in the body of the `main()` function, and it would run as soon as you started the script from the command line.
Instead, we can use an event handler to wait to run that code until someone else joins the meeting. We'll define a function called `greet_user()`, and use the `@transport.event_handler("on_participant_joined")` decorator to tell the SDK that we want to run that function whenever a user joins the room.
```python
@transport.event_handler("on_participant_joined")
async def greet_user(transport, participant):
if participant["info"]["isLocal"]:
return
await tts.say(
"Hello there, " + participant["info"]["userName"] + "!",
transport.send_queue,
)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
```
### Defining a processing pipeline
In this example, we don't actually have much of a processing pipeline! In fact, we're doing the whole thing inside the `greet_user()` function already.
Pipelines usually look like a bunch of nested calls to the `run()` or `run_to_queue()` function from different Services. In this example, we're using the `say()` function from the TTS service. This is effectively a convenience wrapper around the `run_to_queue()` function, which we'll discuss more later. It's important to `await` this function to ensure that the speech frames are queued for playback before the next line of code, because of the `stop_when_done()` function being called immediately afterward.
The output of the `say()` function goes to the transport's `send_queue`. This queue is the all-important connection between the world of the Services pipeline that's generating frames asynchronously and the ordered playback of audio and visual media in the WebRTC call.
### Running the coroutines
In this example, we don't actually have any separate processing pipelines—everything happens as a result of an event from the transport. So we only need to run the transport's coroutine, and await its completion:
```python
await transport.run()
```
In future examples, we'll run more processes in parallel. For now, this script can run until the transport exits—which will happen based on calling `stop_when_done()` in the `greet_user()` function.
## Next Steps
Next, we'll start connecting multiple AI services together by building a service pipeline.
## [02 - LLM Say One Thing »](02-llm-say-one-thing.md)

5
docs/examples/README.md Normal file
View File

@@ -0,0 +1,5 @@
# Daily AI SDK Examples
The docs in this folder pair with the example apps located in `examples/foundational`. They are designed to serve as a quick references for building different kinds of AI apps. But the examples also build on one another, so it can be really helpful to walk through them in order.
To start, you can learn about the overall structure of the examples in [01 - Say One Thing](01-say-one-thing.md).

46
docs/frame-progress.md Normal file
View File

@@ -0,0 +1,46 @@
# A Frame's Progress
1. A user says “Hello, LLM” and the cloud transcription service delivers a transcription to the Transport.
![A transcript frame arrives](images/frame-progress-01.png)
2. The Transport places a Transcription frame in the Pipelines source queue.
![Frame in source queue](images/frame-progress-02.png)
3. The Pipeline passes the Transcription frame to the first Frame Processor in its list, the LLM User Message Aggregator.
![To UMA](images/frame-progress-03.png)
4. The LLM User Message Aggregator updates the LLM Context with a `{“user”: “Hello LLM”}` message.
![Update context](images/frame-progress-04.png)
5. The LLM User Message Aggregator yields an LLM Message Frame, containing the updated LLM Context. The Pipeline passes this frame to the LLM Frame Processor.
![Update context](images/frame-progress-05.png)
6. The LLM Frame Processor creates a streaming chat completion based on the LLM context and yields the first chunk of a response, Text Frame with the value “Hi, “. The Pipeline passes this frame to the TTS Frame Processor. The TTS Frame Processor aggregates this response but doesnt yield anything, yet, because its waiting for a full sentence.
![LLM yields Text](images/frame-progress-06.png)
7. The LLM Frame Processor yields another Text Frame with the value “there.”. The Pipeline passes this frame to the TTS Frame Processor.
![LLM yields more Text](images/frame-progress-07.png)
8. The TTS Frame Processor now has a full sentence, so it starts streaming audio based on “Hi, there.” It yields the first chunk of streaming audio as an Audio frame, which the Pipeline passes to the LLM Assistant Message Aggregator.
![TTS yields Audio](images/frame-progress-08.png)
9. The LLM Assistant Message Aggregator doesnt do anything with Audio frames, so it immediately yields the frame, unchanged. This is the convention for all Frame Processors: frames that the processor doesnt process should be immediately yielded.
![pass-through](images/frame-progress-09.png)
10. The Pipeline places the first Audio frame in its sink queue, which is being watched by the Transport. Since the frame is now in a queue, the Pipeline can continue processing other frames. Note that the source and sink queues form a sort of “boundary of concurrent processing” between a Pipeline and the outside world. In a Pipeline, Frames are processed sequentially; once a Frame is on a queue it can be processed in parallel with the frames being processed by the Pipeline. TODO: link to a more in-depth section about this.
![sink queue](images/frame-progress-10.png)
11. The TTS Frame Processor yields another Audio frame as the Transport transmits the first Audio frame.
![parallel audio](images/frame-progress-11.png)
12. As before, the LLM Assistant Message Aggregator immediately yields the Audio frame and the Pipeline places the Audio frame in the sink queue.
![sink queue 2](images/frame-progress-12.png)
13. The TTS Frame Processor has no more frames to yield. The LLM Frame Processor emits an LLM Response End Frame, which the Pipeline passes to the TTS Frame Processor.
![response end](images/frame-progress-13.png)
14. The TTS Frame Processor immediately yields the LLM Response End Frame, so the Pipeline passes it along to the LLM Assistant Message Aggregator. The LLM Assistant Message Aggregator updates the LLM Context with the full response from the LLM. TODO TODO: I realized I forgot that the TSS Frame Processor also yields the Text frames that the LLM emitted so that the LLM Assistant Message Aggregator could accumulate them, arrggh.
![response end](images/frame-progress-14.png)
15. The system is quiet, and waiting for the next message from the Transport.
![response end](images/frame-progress-15.png)

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 111 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 117 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

5
dot-env.template Normal file
View File

@@ -0,0 +1,5 @@
OPENAI_API_KEY=...
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...

View File

@@ -1,194 +0,0 @@
# AI-COUSTICS
AICOUSTICS_LICENSE_KEY=...
# Anthropic
ANTHROPIC_API_KEY=...
# Assembly AI
ASSEMBLYAI_API_KEY=...
# Async
ASYNCAI_API_KEY=...
ASYNCAI_VOICE_ID=...
# AWS
AWS_SECRET_ACCESS_KEY=...
AWS_ACCESS_KEY_ID=...
AWS_REGION=...
# Azure
AZURE_SPEECH_REGION=...
AZURE_SPEECH_API_KEY=...
AZURE_CHATGPT_API_KEY=...
AZURE_CHATGPT_ENDPOINT=https://...
AZURE_CHATGPT_MODEL=...
AZURE_REALTIME_API_KEY=...
AZURE_REALTIME_BASE_URL=...
AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
CARTESIA_VOICE_ID=...
# Cerebras
CEREBRAS_API_KEY=...
# Daily
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...
# Deepgram
DEEPGRAM_API_KEY=...
SAGEMAKER_ENDPOINT_NAME=...
# DeepSeek
DEEPSEEK_API_KEY=...
# ElevenLabs
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...
# Fal
FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Fish Audio
FISH_API_KEY=...
# Gladia
GLADIA_API_KEY=...
GLADIA_REGION=...
# Google
GOOGLE_API_KEY=...
GOOGLE_VERTEX_TEST_CREDENTIALS=...
GOOGLE_CLOUD_PROJECT_ID=...
GOOGLE_CLOUD_LOCATION=...
GOOGLE_TEST_CREDENTIALS=...
# Grok
GROK_API_KEY=...
# Groq
GROQ_API_KEY=...
# Heygen
HEYGEN_API_KEY=...
# Hume
HUME_API_KEY=...
HUME_VOICE_ID=...
# Inworld
INWORLD_API_KEY=...
# Krisp
KRISP_MODEL_PATH=...
# Krisp Viva
KRISP_VIVA_MODEL_PATH=...
# LiveKit
LIVEKIT_API_KEY=...
LIVEKIT_API_SECRET=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
# MiniMax
MINIMAX_API_KEY=...
MINIMAX_GROUP_ID=...
# Mistral
MISTRAL_API_KEY=...
# Neuphonic
NEUPHONIC_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# OpenAI
OPENAI_API_KEY=...
# OpenPipe
OPENPIPE_API_KEY=...
# OpenRouter
OPENROUTER_API_KEY=...
# Perplexity
PERPLEXITY_API_KEY=...
# Picovoice Koala
KOALA_ACCESS_KEY=...
# Piper
PIPER_BASE_URL=...
# PlayHT
PLAYHT_USER_ID=...
PLAYHT_API_KEY=...
# Plivo
PLIVO_AUTH_ID=...
PLIVO_AUTH_TOKEN=...
# Qwen
QWEN_API_KEY=...
# Rime
RIME_API_KEY=...
RIME_VOICE_ID=...
# SambaNova
SAMBANOVA_API_KEY=...
# Sarvam AI
SARVAM_API_KEY=...
# Sentry
SENTRY_DSN=...
# Simli
SIMLI_API_KEY=...
SIMLI_FACE_ID=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=...
FAL_SMART_TURN_API_KEY=...
# Soniox
SONIOX_API_KEY=...
# Speechmatics
SPEECHMATICS_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
# Telnyx
TELNYX_API_KEY=...
TELNYX_ACCOUNT_SID=...
# Together.ai
TOGETHER_API_KEY=...
# Twilio
TWILIO_ACCOUNT_SID=...
TWILIO_AUTH_TOKEN=...
# WhatsApp
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...
WHATSAPP_PHONE_NUMBER_ID=...
WHATSAPP_APP_SECRET=...

View File

@@ -1,31 +0,0 @@
# Pipecat Examples
This directory contains examples to help you learn how to build with Pipecat.
## Getting Started
New to Pipecat? Start here:
- **[Quickstart](quickstart/)** - Get your first voice AI bot running in 5 minutes _(coming soon)_
- **[Client/Server Web](client-server-web/)** - Learn to build web applications with Pipecat's client SDKs _(coming soon)_
- **[Phone Bot with Twilio](phone-bot-twilio/)** - Connect your bot to a phone number _(coming soon)_
## Foundational Examples
Single-file examples that introduce core Pipecat concepts one at a time. These examples:
- Build on each other progressively
- Focus on specific features or integrations
- Are used for testing with every Pipecat release
See the **[Foundational Examples README](foundational/)** for the complete list.
## More Advanced Examples
Ready to explore complex use cases? Visit **[pipecat-examples](https://github.com/pipecat-ai/pipecat-examples)** for:
- Production-ready applications
- Multi-platform client implementations
- Telephony integrations
- Multimodal and creative applications
- Deployment and monitoring examples

View File

@@ -1,70 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.piper.tts import PiperTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
tts = PiperTTSService(
base_url=os.getenv("PIPER_BASE_URL"), aiohttp_session=session, sample_rate=24000
)
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,71 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.rime.tts import RimeHttpTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
aiohttp_session=session,
)
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,68 +1,54 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import logging
import os
from dailyai.pipeline.frames import EndFrame, TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from runner import configure
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
None,
"Say One Thing",
mic_enabled=True,
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
pipeline = Pipeline([tts])
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
if participant["info"]["isLocal"]:
return
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
participant_name = participant["info"]["userName"] or ''
await pipeline.queue_frames([TextFrame("Hello there, " + participant_name + "!"), EndFrame()])
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
await transport.run(pipeline)
del tts
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -1,49 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
transport = LocalAudioTransport(LocalAudioTransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
pipeline = Pipeline([tts, transport.output()])
task = PipelineTask(pipeline)
async def say_something():
await asyncio.sleep(1)
await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()])
runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True)
await asyncio.gather(runner.run(task), say_something())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,38 @@
import asyncio
import aiohttp
import logging
import os
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def main():
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 1
transport = LocalTransport(
duration_minutes=meeting_duration_minutes, mic_enabled=True
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
async def say_something():
await asyncio.sleep(1)
await tts.say(
"Hello there.",
transport.send_queue,
)
await transport.stop_when_done()
await asyncio.gather(transport.run(), say_something())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,62 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.livekit import configure
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
(url, token, room_name) = await configure()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
runner = PipelineRunner()
task = PipelineTask(Pipeline([tts, transport.output()]))
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TTSSpeakFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,65 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.riva.tts import FastPitchTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
tts = FastPitchTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
task = PipelineTask(
Pipeline([tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([TTSSpeakFrame(f"Hello there!"), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,79 +1,59 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import logging
import aiohttp
from dailyai.pipeline.frames import EndFrame, LLMMessagesFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from runner import configure
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_out_enabled=True),
}
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
None,
"Say One Thing From an LLM",
mic_enabled=True,
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}
]
messages = [
{
"role": "system",
"content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.",
}]
task = PipelineTask(
Pipeline([llm, tts, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
pipeline = Pipeline([llm, tts])
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frames([LLMContextFrame(LLMContext(messages)), EndFrame()])
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await pipeline.queue_frames([LLMMessagesFrame(messages), EndFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
await transport.run(pipeline)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -1,83 +1,57 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import logging
import os
import aiohttp
from dailyai.pipeline.frames import TextFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.fal_ai_services import FalImageGenService
from runner import configure
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.fal.image import FalImageGenService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async def main(room_url):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
None,
"Show a still frame image",
camera_enabled=True,
camera_width=1024,
camera_height=1024,
duration_minutes=1
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
image_size="square_hd",
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
task = PipelineTask(
Pipeline([imagegen, transport.output()]),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
pipeline = Pipeline([imagegen])
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# Note that we do not put an EndFrame() item in the pipeline for this demo.
# This means that the bot will stay in the channel until it times out.
# An EndFrame() in the pipeline would cause the transport to shut
# down.
await pipeline.queue_frames(
[TextFrame("a cat in the style of picasso")]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
await transport.run(pipeline)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -0,0 +1,55 @@
import asyncio
import aiohttp
import logging
import os
import tkinter as tk
from dailyai.pipeline.frames import TextFrame
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
local_joined = False
participant_joined = False
async def main():
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 2
tk_root = tk.Tk()
tk_root.title("Calendar")
transport = LocalTransport(
tk_root=tk_root,
mic_enabled=True,
camera_enabled=True,
camera_width=1024,
camera_height=1024,
duration_minutes=meeting_duration_minutes,
)
imagegen = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
image_task = asyncio.create_task(
imagegen.run_to_queue(
transport.send_queue, [
TextFrame("a cat in the style of picasso")]))
async def run_tk():
while not transport._stop_threads.is_set():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
await asyncio.gather(transport.run(), image_task, run_tk())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,62 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import tkinter as tk
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.fal.image import FalImageGenService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
tk_root = tk.Tk()
tk_root.title("Picasso Cat")
transport = TkLocalTransport(
tk_root,
TkTransportParams(video_out_enabled=True, video_out_width=1024, video_out_height=1024),
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
pipeline = Pipeline([imagegen, transport.output()])
task = PipelineTask(pipeline)
await task.queue_frames([TextFrame("a cat in the style of picasso")])
runner = PipelineRunner()
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
await asyncio.gather(runner.run(task), run_tk())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,84 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.image import GoogleImageGenService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
imagegen = GoogleImageGenService(
api_key=os.getenv("GOOGLE_API_KEY"),
)
task = PipelineTask(
Pipeline([imagegen, transport.output()]),
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Register an event handler so we can play the audio when the client joins
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frame(TextFrame("a cat in the style of picasso"))
await task.queue_frame(TextFrame("a dog in the style of picasso"))
await task.queue_frame(TextFrame("a fish in the style of picasso"))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,178 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
load_dotenv(override=True)
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)
async def run_example(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
# Create a transport using the WebRTC connection
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/client/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"],
type=request["type"],
restart_pc=request.get("restart_pc", False),
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
# Run example function with SmallWebRTC transport arguments.
background_tasks.add_task(run_example, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.disconnect() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
args = parser.parse_args()
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -0,0 +1,85 @@
import asyncio
import logging
import os
import aiohttp
from dailyai.pipeline.merge_pipeline import SequentialMergePipeline
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.azure_ai_services import AzureLLMService, AzureTTSService
from dailyai.services.deepgram_ai_services import DeepgramTTSService
from dailyai.pipeline.frames import EndPipeFrame, LLMMessagesFrame, TextFrame
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
None,
"Static And Dynamic Speech",
duration_minutes=1,
mic_enabled=True,
mic_sample_rate=16000,
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
azure_tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
deepgram_tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
)
elevenlabs_tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [{"role": "system",
"content": "tell the user a joke about llamas"}]
# Start a task to run the LLM to create a joke, and convert the LLM output to audio frames. This task
# will run in parallel with generating and speaking the audio for static text, so there's no delay to
# speak the LLM response.
llm_pipeline = Pipeline([llm, elevenlabs_tts])
await llm_pipeline.queue_frames([LLMMessagesFrame(messages), EndPipeFrame()])
simple_tts_pipeline = Pipeline([azure_tts])
await simple_tts_pipeline.queue_frames(
[
TextFrame("My friend the LLM is going to tell a joke about llamas."),
EndPipeFrame(),
]
)
merge_pipeline = SequentialMergePipeline(
[simple_tts_pipeline, llm_pipeline])
await asyncio.gather(
transport.run(merge_pipeline),
simple_tts_pipeline.run_pipeline(),
llm_pipeline.run_pipeline(),
)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -1,106 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.daily import configure
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,140 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import json
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
InterruptionFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.livekit import configure
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.livekit.transport import LiveKitParams, LiveKitTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
(url, token, room_name) = await configure()
transport = LiveKitTransport(
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. "
"Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TTSSpeakFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)
# Register an event handler to receive data from the participant via text chat
# in the LiveKit room. This will be used to as transcription frames and
# interrupt the bot and pass it to llm for processing and
# then pass back to the participant as audio output.
@transport.event_handler("on_data_received")
async def on_data_received(transport, data, participant_id):
logger.info(f"Received data from participant {participant_id}: {data}")
# convert data from bytes to string
json_data = json.loads(data)
await task.queue_frames(
[
InterruptionFrame(),
UserStartedSpeakingFrame(),
TranscriptionFrame(
user_id=participant_id,
timestamp=json_data["timestamp"],
text=json_data["message"],
),
UserStoppedSpeakingFrame(),
],
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,136 +1,114 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dataclasses import dataclass
import asyncio
import aiohttp
from dotenv import load_dotenv
from loguru import logger
import os
import logging
from pipecat.frames.frames import (
DataFrame,
Frame,
LLMContextFrame,
LLMFullResponseStartFrame,
TextFrame,
from dataclasses import dataclass
from typing import AsyncGenerator
from dailyai.pipeline.aggregators import (
GatedAggregator,
LLMFullResponseAggregator,
ParallelPipeline,
SentenceAggregator,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
from pipecat.services.fal.image import FalImageGenService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from dailyai.pipeline.frames import (
Frame,
TextFrame,
EndFrame,
ImageFrame,
LLMMessagesFrame,
LLMResponseStartFrame,
)
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
@dataclass
class MonthFrame(DataFrame):
class MonthFrame(Frame):
month: str
def __str__(self):
return f"{self.name}(month: {self.month})"
class MonthPrepender(FrameProcessor):
def __init__(self):
super().__init__()
self.most_recent_month = "Placeholder, month frame not yet received"
self.prepend_to_next_text_frame = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
if isinstance(frame, MonthFrame):
self.most_recent_month = frame.month
elif self.prepend_to_next_text_frame and isinstance(frame, TextFrame):
await self.push_frame(TextFrame(f"{self.most_recent_month}: {frame.text}"))
yield TextFrame(f"{self.most_recent_month}: {frame.text}")
self.prepend_to_next_text_frame = False
elif isinstance(frame, LLMFullResponseStartFrame):
elif isinstance(frame, LLMResponseStartFrame):
self.prepend_to_next_text_frame = True
await self.push_frame(frame)
yield frame
else:
await self.push_frame(frame, direction)
yield frame
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run the Calendar Month Narration bot using WebRTC transport.
Args:
webrtc_connection: The WebRTC connection to use
room_name: Optional room name for display purposes
"""
logger.info(f"Starting bot")
# Create an HTTP session for API calls
async def main(room_url):
async with aiohttp.ClientSession() as session:
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
transport = DailyTransport(
room_url,
None,
"Month Narration Bot",
mic_enabled=True,
camera_enabled=True,
mic_sample_rate=16000,
camera_width=1024,
camera_height=1024,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
image_size="square_hd",
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
gated_aggregator = GatedAggregator(
gate_open_fn=lambda frame: isinstance(
frame, ImageFrame), gate_close_fn=lambda frame: isinstance(
frame, LLMResponseStartFrame), start_open=False, )
sentence_aggregator = SentenceAggregator()
month_prepender = MonthPrepender()
llm_full_response_aggregator = LLMFullResponseAggregator()
# With `SyncParallelPipeline` we synchronize audio and images by pushing
# them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2 I3 A3). To do
# that, each pipeline runs concurrently and `SyncParallelPipeline` will
# wait for the input frame to be processed.
#
# Note that `SyncParallelPipeline` requires the last processor in each
# of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
SyncParallelPipeline( # Run pipelines in parallel aggregating the result
[month_prepender, tts], # Create "Month: sentence" and output audio
[imagegen], # Generate image
processors=[
llm,
sentence_aggregator,
ParallelPipeline(
[[month_prepender, tts], [llm_full_response_aggregator, imagegen]]
),
transport.output(), # Transport output
]
gated_aggregator,
],
)
frames = []
@@ -154,38 +132,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
frames.append(MonthFrame(month=month))
frames.append(LLMContextFrame(LLMContext(messages)))
frames.append(MonthFrame(month))
frames.append(LLMMessagesFrame(messages))
task = PipelineTask(
pipeline,
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
frames.append(EndFrame())
await pipeline.queue_frames(frames)
# Set up transport event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Start the month narration once connected
await task.queue_frames(frames)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
await transport.run(pipeline, override_pipeline_source_queue=False)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -1,199 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import tkinter as tk
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
OutputAudioRawFrame,
TextFrame,
TTSAudioRawFrame,
URLImageRawFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
from pipecat.services.fal.image import FalImageGenService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
tk_root = tk.Tk()
tk_root.title("Calendar")
runner = PipelineRunner()
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
class ImageDescription(FrameProcessor):
def __init__(self):
super().__init__()
self.text = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
self.text = frame.text
await self.push_frame(frame, direction)
class AudioGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.audio = bytearray()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TTSAudioRawFrame):
self.audio.extend(frame.audio)
self.frame = OutputAudioRawFrame(
bytes(self.audio), frame.sample_rate, frame.num_channels
)
await self.push_frame(frame, direction)
class ImageGrabber(FrameProcessor):
def __init__(self):
super().__init__()
self.frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, URLImageRawFrame):
self.frame = frame
await self.push_frame(frame, direction)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(image_size="square_hd"),
aiohttp_session=session,
key=os.getenv("FAL_KEY"),
)
sentence_aggregator = SentenceAggregator()
description = ImageDescription()
audio_grabber = AudioGrabber()
image_grabber = ImageGrabber()
# With `SyncParallelPipeline` we synchronize audio and images by
# pushing them basically in order (e.g. I1 A1 A1 A1 I2 A2 A2 A2 A2
# I3 A3). To do that, each pipeline runs concurrently and
# `SyncParallelPipeline` will wait for the input frame to be
# processed.
#
# Note that `SyncParallelPipeline` requires the last processor in
# each of the pipelines to be synchronous. In this case, we use
# `CartesiaHttpTTSService` and `FalImageGenService` which make HTTP
# requests and wait for the response.
pipeline = Pipeline(
[
llm, # LLM
sentence_aggregator, # Aggregates LLM output into full sentences
description, # Store sentence
SyncParallelPipeline(
[tts, audio_grabber], # Generate and store audio for the given sentence
[imagegen, image_grabber], # Generate and storeimage for the given sentence
),
]
)
task = PipelineTask(pipeline)
await task.queue_frame(LLMContextFrame(LLMContext(messages)))
await task.stop_when_done()
await runner.run(task)
return {
"month": month,
"text": description.text,
"image": image_grabber.frame,
"audio": audio_grabber.frame,
}
transport = TkLocalTransport(
tk_root,
TkTransportParams(
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
)
pipeline = Pipeline([transport.output()])
task = PipelineTask(pipeline)
# We only specify a few months as we create tasks all at once and we
# might get rate limited otherwise.
months: list[str] = [
"January",
"February",
]
# We create one task per month. This will be executed concurrently.
month_tasks = [asyncio.create_task(get_month_data(month)) for month in months]
# Now we wait for each month task in the order they're completed. The
# benefit is we'll have as little delay as possible before the first
# month, and likely no delay between months, but the months won't
# display in order.
async def show_images(month_tasks):
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
await task.queue_frames([data["image"], data["audio"]])
await runner.stop_when_done()
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
await asyncio.gather(runner.run(task), show_images(month_tasks), run_tk())
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,146 @@
import aiohttp
import argparse
import asyncio
import logging
import tkinter as tk
import os
from dailyai.pipeline.frames import AudioFrame, ImageFrame
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from dailyai.transports.local_transport import LocalTransport
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def main(room_url):
async with aiohttp.ClientSession() as session:
meeting_duration_minutes = 5
tk_root = tk.Tk()
tk_root.title("Calendar")
transport = LocalTransport(
mic_enabled=True,
camera_enabled=True,
camera_width=1024,
camera_height=1024,
duration_minutes=meeting_duration_minutes,
tk_root=tk_root,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
dalle = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
# Get a complete audio chunk from the given text. Splitting this into its own
# coroutine lets us ensure proper ordering of the audio chunks on the
# send queue.
async def get_all_audio(text):
all_audio = bytearray()
async for audio in tts.run_tts(text):
all_audio.extend(audio)
return all_audio
async def get_month_data(month):
messages = [
{
"role": "system",
"content": f"Describe a nature photograph suitable for use in a calendar, for the month of {month}. Include only the image description with no preamble. Limit the description to one sentence, please.",
}
]
image_description = await llm.run_llm(messages)
if not image_description:
return
to_speak = f"{month}: {image_description}"
audio_task = asyncio.create_task(get_all_audio(to_speak))
image_task = asyncio.create_task(
dalle.run_image_gen(image_description))
(audio, image_data) = await asyncio.gather(audio_task, image_task)
return {
"month": month,
"text": image_description,
"image_url": image_data[0],
"image": image_data[1],
"audio": audio,
}
months: list[str] = [
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]
async def show_images():
# This will play the months in the order they're completed. The benefit
# is we'll have as little delay as possible before the first month, and
# likely no delay between months, but the months won't display in
# order.
for month_data_task in asyncio.as_completed(month_tasks):
data = await month_data_task
if data:
await transport.send_queue.put(
[
ImageFrame(data["image_url"], data["image"]),
AudioFrame(data["audio"]),
]
)
await asyncio.sleep(25)
# wait for the output queue to be empty, then leave the meeting
await transport.stop_when_done()
async def run_tk():
while not transport._stop_threads.is_set():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
month_tasks = [
asyncio.create_task(
get_month_data(month)) for month in months]
await asyncio.gather(transport.run(), show_images(), run_tk())
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple Daily Bot Sample")
parser.add_argument(
"-u",
"--url",
type=str,
required=True,
help="URL of the Daily room to join")
args, unknown = parser.parse_known_args()
asyncio.run(main(args.url))

View File

@@ -1,157 +1,88 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import logging
import os
from dailyai.pipeline.frames import LLMMessagesFrame
from dailyai.pipeline.pipeline import Pipeline
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.ai_services import FrameLogger
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from runner import configure
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import Frame, LLMRunFrame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, MetricsFrame):
for d in frame.data:
if isinstance(d, TTFBMetricsData):
print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
elif isinstance(d, ProcessingMetricsData):
print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
elif isinstance(d, LLMUsageMetricsData):
tokens = d.value
print(
f"!!! MetricsFrame: {frame}, tokens: {tokens.prompt_tokens}, characters: {tokens.completion_tokens}"
)
elif isinstance(d, TTSUsageMetricsData):
print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
await self.push_frame(frame, direction)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
vad_enabled=True,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
ml = MetricsLogger()
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
ml,
transport.output(),
context_aggregator.assistant(),
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
fl = FrameLogger("Inner")
fl2 = FrameLogger("Outer")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
tma_in = LLMUserContextAggregator(
messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(
messages, transport._my_participant_id
)
pipeline = Pipeline(
processors=[
fl,
tma_in,
llm,
fl2,
tts,
tma_out,
],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await pipeline.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
transport.transcription_settings["extra"]["endpointing"] = True
transport.transcription_settings["extra"]["punctuate"] = True
await transport.run(pipeline)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,174 +1,122 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
import logging
from typing import AsyncGenerator
import aiohttp
from PIL import Image
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMRunFrame,
OutputImageRawFrame,
from dailyai.pipeline.frames import ImageFrame, Frame
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.ai_services import AIService
from dailyai.pipeline.aggregators import (
LLMAssistantContextAggregator,
LLMUserContextAggregator,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from dailyai.services.fal_ai_services import FalImageGenService
from runner import configure
from dotenv import load_dotenv
load_dotenv(override=True)
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
class ImageSyncAggregator(FrameProcessor):
class ImageSyncAggregator(AIService):
def __init__(self, speaking_path: str, waiting_path: str):
super().__init__()
self._speaking_image = Image.open(speaking_path)
self._speaking_image_format = self._speaking_image.format
self._speaking_image_bytes = self._speaking_image.tobytes()
self._waiting_image = Image.open(waiting_path)
self._waiting_image_format = self._waiting_image.format
self._waiting_image_bytes = self._waiting_image.tobytes()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
async def process_frame(self, frame: Frame) -> AsyncGenerator[Frame, None]:
yield ImageFrame(None, self._speaking_image_bytes)
yield frame
yield ImageFrame(None, self._waiting_image_bytes)
if isinstance(frame, BotStartedSpeakingFrame):
await self.push_frame(
OutputImageRawFrame(
image=self._speaking_image_bytes,
size=(1024, 1024),
format=self._speaking_image_format,
)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
5,
)
transport._camera_enabled = True
transport._camera_width = 1024
transport._camera_height = 1024
transport._mic_enabled = True
transport._mic_sample_rate = 16000
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
img = FalImageGenService(
image_size="1024x1024",
aiohttp_session=session,
key_id=os.getenv("FAL_KEY_ID"),
key_secret=os.getenv("FAL_KEY_SECRET"),
)
async def get_images():
get_speaking_task = asyncio.create_task(
img.run_image_gen("An image of a cat speaking")
)
get_waiting_task = asyncio.create_task(
img.run_image_gen("An image of a cat waiting")
)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(
OutputImageRawFrame(
image=self._waiting_image_bytes,
size=(1024, 1024),
format=self._waiting_image_format,
)
(speaking_data, waiting_data) = await asyncio.gather(
get_speaking_task, get_waiting_task
)
await self.push_frame(frame, direction)
return speaking_data, waiting_data
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await tts.say("Hi, I'm listening!", transport.send_queue)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def handle_transcriptions():
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserContextAggregator(
messages, transport._my_participant_id)
tma_out = LLMAssistantContextAggregator(
messages, transport._my_participant_id
)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(
os.path.dirname(__file__), "assets", "speaking.png"), os.path.join(
os.path.dirname(__file__), "assets", "waiting.png"), )
await tts.run_to_queue(
transport.send_queue,
image_sync_aggregator.run(
tma_out.run(llm.run(tma_in.run(transport.get_receive_frames())))
),
)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
image_sync_aggregator = ImageSyncAggregator(
os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
image_sync_aggregator,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
transport.transcription_settings["extra"]["punctuate"] = True
await asyncio.gather(transport.run(), handle_transcriptions())
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,128 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY"))
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,127 +1,76 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import logging
import os
from dailyai.pipeline.aggregators import (
LLMResponseAggregator,
UserResponseAggregator,
)
from dailyai.pipeline.pipeline import Pipeline
from dailyai.services.ai_services import FrameLogger
from dailyai.transports.daily_transport import DailyTransport
from dailyai.services.open_ai_services import OpenAILLMService
from dailyai.services.elevenlabs_ai_service import ElevenLabsTTSService
from runner import configure
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
logging.basicConfig(format=f"%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("dailyai")
logger.setLevel(logging.DEBUG)
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
duration_minutes=5,
start_transcription=True,
mic_enabled=True,
mic_sample_rate=16000,
camera_enabled=False,
vad_enabled=True,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
pipeline = Pipeline([FrameLogger(), llm, FrameLogger(), tts])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
@transport.event_handler("on_first_other_participant_joined")
async def on_first_other_participant_joined(transport):
await transport.say("Hi, I'm listening!", tts)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
async def run_conversation():
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio. Respond to what the user said in a creative and helpful way.",
},
]
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
await transport.run_interruptible_pipeline(
pipeline,
post_processor=LLMResponseAggregator(messages),
pre_processor=UserResponseAggregator(messages),
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
transport.transcription_settings["extra"]["punctuate"] = False
await asyncio.gather(transport.run(), run_conversation())
if __name__ == "__main__":
from pipecat.runner.run import main
main()
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,187 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.services.speechmatics.tts import SpeechmaticsTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Speechmatics STT and TTS Service Example
This example demonstrates using Speechmatics Speech-to-Text and Text-to-Speech services
with speaker diarization and intelligent speaker management. Key features:
1. Speaker Diarization (STT)
- Automatically identifies and distinguishes between different speakers
- First speaker is identified as 'S1', others get subsequent IDs
- Uses `enable_diarization` parameter to manage speaker detection
2. Smart Speaker Control (STT)
- `focus_speakers` parameter lets you target specific speakers (e.g. ["S1"])
- Other speakers will be wrapped in PASSIVE tags
- Only processes speech from focused speakers
- Words from all speakers are wrapped with XML tags for clear speaker identification
- Other speakers' speech only sent when focused speaker is active
3. Voice Activity Detection (STT)
- Built-in VAD using `enable_vad` parameter
- Remove `vad_analyzer` from `transport` config to use module's VAD
- Emits speaker started/stopped events
4. Text-to-Speech (TTS)
- Low latency streaming audio synthesis
- Multiple voice options available including `sarah`, `theo`, and `megan`
5. Configuration Options
- `operating_point` parameter defaults to `ENHANCED` for optimal accuracy
- Configurable `end_of_utterance_silence_trigger` (default 0.5s)
- Customizable speaker formatting
- Additional diarization settings available
For detailed information:
- STT: https://docs.speechmatics.com/rt-api-ref
- TTS: https://docs.speechmatics.com/text-to-speech/quickstart
"""
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_vad=True,
enable_diarization=True,
focus_speakers=["S1"],
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
speaker_passive_format="<PASSIVE><{speaker_id}>{text}</{speaker_id}></PASSIVE>",
),
)
tts = SpeechmaticsTTSService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
voice_id="sarah",
aiohttp_session=session,
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Sarah. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies. "
"Do not respond to speakers within `<PASSIVE/>` tags unless explicitly asked to. "
),
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,176 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.base_llm import BaseOpenAILLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.services.speechmatics.tts import SpeechmaticsTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run example using Speechmatics STT and TTS.
This example demonstrates a complete Speechmatics integration with both Speech-to-Text
and Text-to-Speech services:
STT Features:
- Diarization to identify and distinguish between different speakers
- Words spoken by each speaker are wrapped with XML tags for LLM processing
- System context instructions help the LLM understand multi-party conversations
- ENHANCED operating point by default for optimal accuracy
TTS Features:
- Low latency streaming audio synthesis
- Multiple voice options available including `sarah`, `theo`, and `megan`
For more information:
- STT: https://docs.speechmatics.com/rt-api-ref
- TTS: https://docs.speechmatics.com/text-to-speech/quickstart
"""
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = SpeechmaticsSTTService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
params=SpeechmaticsSTTService.InputParams(
language=Language.EN,
enable_diarization=True,
end_of_utterance_silence_trigger=0.5,
speaker_active_format="<{speaker_id}>{text}</{speaker_id}>",
),
)
tts = SpeechmaticsTTSService(
api_key=os.getenv("SPEECHMATICS_API_KEY"),
voice_id="sarah",
aiohttp_session=session,
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
params=BaseOpenAILLMService.InputParams(temperature=0.75),
)
messages = [
{
"role": "system",
"content": (
"You are a helpful British assistant called Sarah. "
"Your goal is to demonstrate your capabilities in a succinct way. "
"Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. "
"Always include punctuation in your responses. "
"Give very short replies - do not give longer replies unless strictly necessary. "
"Respond to what the user said in a concise, funny, creative and helpful way. "
"Use `<Sn/>` tags to identify different speakers - do not use tags in your replies."
),
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.005),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say a short hello to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,126 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import SonioxSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = SonioxSTTService(
api_key=os.getenv("SONIOX_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,138 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.inworld.tts import InworldTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Inworld TTS Service - Unified streaming and non-streaming
# Set streaming=True for real-time audio, streaming=False for complete audio generation
streaming = True # Toggle this to switch between modes
tts = InworldTTSService(
api_key=os.getenv("INWORLD_API_KEY", ""),
aiohttp_session=session,
voice_id="Ashley",
model="inworld-tts-1",
streaming=streaming, # True: real-time chunks, False: complete audio then playback
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are very knowledgable about dogs. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,133 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.asyncai.tts import AsyncAIHttpTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AsyncAIHttpTTSService(
api_key=os.getenv("ASYNCAI_API_KEY", ""),
voice_id=os.getenv("ASYNCAI_VOICE_ID", "e0f39dc4-f691-4e78-bba5-5c636692cc04"),
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,129 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.asyncai.tts import AsyncAITTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = AsyncAITTSService(
api_key=os.getenv("ASYNCAI_API_KEY", ""),
voice_id=os.getenv("ASYNCAI_VOICE_ID", "e0f39dc4-f691-4e78-bba5-5c636692cc04"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,174 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import datetime
import os
import wave
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# Create audio buffer processor so we can hear the audio fitler results.
audiobuffer = AudioBufferProcessor(
num_channels=2, # 1 for mono, 2 for stereo (user left, bot right)
enable_turn_audio=False, # Enable per-turn audio recording
)
def _create_aic_filter() -> AICFilter:
license_key = os.getenv("AICOUSTICS_LICENSE_KEY", "")
return AICFilter(
license_key=license_key,
enhancement_level=0.5,
)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: (
lambda aic: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
"twilio": lambda: (
lambda aic: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
"webrtc": lambda: (
lambda aic: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=aic.create_vad_analyzer(lookback_buffer_size=6.0, sensitivity=6.0),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=aic,
)
)(_create_aic_filter()),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
audiobuffer, # write audio data to a file
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
await audiobuffer.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
# Save or process the composite audio
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"./conversation_{timestamp}.wav"
# Create the WAV file
with wave.open(filename, "wb") as wf:
wf.setnchannels(num_channels)
wf.setsampwidth(2) # 16-bit audio
wf.setframerate(sample_rate)
wf.writeframes(audio)
logger.info(f"Saved recording to {filename}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,153 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, TTSTextFrame
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.hume.tts import HUME_SAMPLE_RATE, HumeTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = HumeTTSService(
api_key=os.getenv("HUME_API_KEY"),
# Replace with your Hume voice ID
voice_id="f898a92e-685f-43fa-985b-a46920f0650b",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS (HumeTTSService with word timestamps)
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
audio_out_sample_rate=HUME_SAMPLE_RATE,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
}
),
],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
logger.info(
"💡 Word timestamps are enabled! Watch the console for TTSTextFrame logs showing each word with its PTS."
)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,157 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMMessagesUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
message_store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in message_store:
message_store[session_id] = ChatMessageHistory()
return message_store[session_id]
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
"Your response will be synthesized to voice and those characters will create unnatural sounds.",
),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
chain = prompt | ChatOpenAI(model="gpt-4.1", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,
history_messages_key="chat_history",
input_messages_key="input",
)
lc = LangchainProcessor(history_chain)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
lc, # Langchain
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
# An `LLMContextFrame` will be picked up by the LangchainProcessor using
# only the content of the last message to inject it in the prompt defined
# above. So no role is required here.
messages = [({"content": "Please briefly introduce yourself to the user."})]
await task.queue_frames([LLMMessagesUpdateFrame(messages, run_llm=True)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,125 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response_universal import (
LLMContext,
LLMContextAggregatorPair,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramFluxSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
params=DeepgramFluxSTTService.InputParams(min_confidence=0.3),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@stt.event_handler("on_update")
async def on_deepgram_flux_update(stt, transcript):
logger.debug(f"On deeggram flux update: {transcript}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,132 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramHttpTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramHttpTTSService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-2-andromeda-en",
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,137 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.deepgram.stt_sagemaker import DeepgramSageMakerSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Initialize Deepgram SageMaker STT Service
# This requires:
# - AWS credentials configured (via environment variables or AWS CLI)
# - A deployed SageMaker endpoint with Deepgram model
stt = DeepgramSageMakerSTTService(
endpoint_name=os.getenv("SAGEMAKER_ENDPOINT_NAME"),
region=os.getenv("AWS_REGION"),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = AWSBedrockLLMService(
aws_region=os.getenv("AWS_REGION"),
model="us.amazon.nova-pro-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,133 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from deepgram import LiveOptions
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
InterruptionFrame,
LLMRunFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
live_options=LiveOptions(vad_events=True, utterance_end_ms="1000"),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@stt.event_handler("on_speech_started")
async def on_speech_started(stt, *args, **kwargs):
await task.queue_frames([InterruptionFrame(), UserStartedSpeakingFrame()])
@stt.event_handler("on_utterance_end")
async def on_utterance_end(stt, *args, **kwargs):
await task.queue_frames([UserStoppedSpeakingFrame()])
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,126 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -1,136 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.elevenlabs.stt import ElevenLabsSTTService
from pipecat.services.elevenlabs.tts import ElevenLabsHttpTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session
async with aiohttp.ClientSession() as session:
stt = ElevenLabsSTTService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
aiohttp_session=session,
)
tts = ElevenLabsHttpTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

Some files were not shown because too many files have changed in this diff Show More