Compare commits

...

49 Commits

Author SHA1 Message Date
Mark Backman
027ab8fedf Update demos with RunnerArguments, runner_args 2025-07-30 21:49:58 -04:00
Mark Backman
dbf9899de8 Update quickstart examples for the new runner 2025-07-30 21:45:37 -04:00
Mark Backman
a22bebd174 Update phone-bot-twilio README 2025-07-30 21:45:37 -04:00
Mark Backman
3ab9c15583 Add README to client-server-web, add phone-bot-twilio files 2025-07-30 21:45:37 -04:00
Mark Backman
eca366bfac Add client/server quickstart example 2025-07-30 21:45:37 -04:00
Mark Backman
c5483411f2 Add quickstart example 2025-07-30 21:45:37 -04:00
Mark Backman
ee514f6e4c Update examples with review feedback 2025-07-30 21:41:15 -04:00
Mark Backman
90487ac144 Use RunnerArguments in examples 2025-07-30 21:18:47 -04:00
Mark Backman
54f0bb8326 session_args become runner_args 2025-07-30 21:12:02 -04:00
Mark Backman
f35a58abf1 Change LOCAL_RUN to ENV, update examples 2025-07-30 20:54:18 -04:00
Mark Backman
e33ca26e2d Revert "Add is_local_development utility for a cleaner API to determine if running locally"
This reverts commit 471311b18f.
2025-07-30 20:45:53 -04:00
Mark Backman
471311b18f Add is_local_development utility for a cleaner API to determine if running locally 2025-07-30 19:50:47 -04:00
Mark Backman
0c3e526c19 Remove dependency on pipecatcloud, define new *RunnerArguments types 2025-07-30 18:53:21 -04:00
Mark Backman
58fc952192 Add create_transport factory method example 2025-07-30 08:53:24 -04:00
Mark Backman
46c520bb36 Set Daily expiration to 2-hrs, add Telnyx API key support 2025-07-29 23:36:23 -04:00
Mark Backman
5b6e25a7e0 Add --direct option to directly connect to a Daily room for faster testing iteration 2025-07-29 21:53:24 -04:00
Mark Backman
f133cf97c8 Parse telephony data into call_data object, add support for Telnyx, update example 2025-07-29 21:17:42 -04:00
Mark Backman
dba182b396 Add create_transport factory utlities 2025-07-29 21:02:46 -04:00
Mark Backman
32c7457734 Add single transport example, mark SmallWebRTCSessionArguments as deprecated 2025-07-29 19:47:33 -04:00
Mark Backman
c89422f2f2 Add ESP32, update docstrings 2025-07-29 19:36:38 -04:00
Mark Backman
66b4bbec1a Remove local.py, rename exaples accordingly 2025-07-29 19:23:26 -04:00
Mark Backman
b125d31088 Revert the cloud-simple-bot example back to using the built-in types 2025-07-29 15:36:33 -04:00
Mark Backman
980e52e72e Remove quickstart example—moving to a separate PR 2025-07-29 15:12:26 -04:00
Mark Backman
e2cfa45cc0 Collect package dependencies in a new optional dependency called runner 2025-07-29 15:12:26 -04:00
Mark Backman
061de9cbaf Fix docstring parsing 2025-07-29 15:12:26 -04:00
Mark Backman
f9e316686f Improve cloud.py module docstring 2025-07-29 15:12:26 -04:00
Mark Backman
6b194a2954 Clean up utils.py: remove unused function, update private class naming 2025-07-29 15:12:26 -04:00
Mark Backman
83a88d7c85 Clean up logging, refactor cloud.py's _create_server_app 2025-07-29 15:12:26 -04:00
Mark Backman
f5e23c36a4 Add another cloud-simple example 2025-07-29 15:12:26 -04:00
Mark Backman
155817a1fe Mimic Pipecat Cloud websocket handling in the cloud runner, add a websocket message parser called parse_telephony_websocket in utils.py, update examples to use the new functionality 2025-07-29 15:12:26 -04:00
Mark Backman
16c80b2335 Rename examples files, update quickstart 2025-07-29 15:12:26 -04:00
Mark Backman
49af1553e0 Add pipecat.runner to docs auto-generation 2025-07-29 15:12:26 -04:00
Mark Backman
6dad4de2d2 Updates to cloud examples: cloud-simple so it can be deployed and use Krisp 2025-07-29 15:12:26 -04:00
Mark Backman
214c376933 Add typing support for session_args, fix debug logging in cloud.py 2025-07-29 15:12:26 -04:00
Mark Backman
7db57109dc Add RTVI to local-simple-bot.py 2025-07-29 15:12:26 -04:00
Mark Backman
9fd3e466ab Add startup message for local WebRTC, remove WebRTC from cloud /connect 2025-07-29 15:12:26 -04:00
Mark Backman
ccd71cfafb Comment about starting a SmallWebRTCTransport client session 2025-07-29 15:12:26 -04:00
Mark Backman
eee8cf35b2 Fix the /connect endpoint's return value for Daily, WebRTC 2025-07-29 15:12:26 -04:00
Mark Backman
acbc045d47 PCC deployment scripts 2025-07-29 15:12:26 -04:00
Mark Backman
579eaf0889 Ignore import warning in _get_bot_module 2025-07-29 15:12:26 -04:00
Mark Backman
1284c24144 Console message for where to connect to the server 2025-07-29 15:12:26 -04:00
Mark Backman
ad84a5af46 Remove livekit from cloud.py, use localhost for webrtc 2025-07-29 15:12:26 -04:00
Mark Backman
8138c6ceab Rename examples 2025-07-29 15:12:26 -04:00
Mark Backman
112ae23f4d Final cleanup 2025-07-29 15:12:26 -04:00
Mark Backman
fa6c9c35f7 Renaming files 2025-07-29 15:12:26 -04:00
Mark Backman
7288d9b738 Add quickstart examples 2025-07-29 15:12:26 -04:00
Mark Backman
a494bd7a61 Checkpoint: local working, server: daily,webrtc working 2025-07-29 15:12:26 -04:00
Mark Backman
5532655eb9 Lazy load imports to avoid unnecessary dependencies when running 2025-07-29 15:12:26 -04:00
Mark Backman
be1d8041e2 Add new module, add telnyx, plivo, livekit runners 2025-07-29 15:12:26 -04:00
44 changed files with 5646 additions and 0 deletions

View File

@@ -1,5 +1,12 @@
#!/bin/bash
# Check if sphinx-build is installed
if ! command -v sphinx-build &> /dev/null; then
echo "Error: sphinx-build is not installed or not in PATH" >&2
echo "Please install Sphinx using: pip install -r requirements.txt" >&2
exit 1
fi
# Clean previous build
rm -rf _build

View File

@@ -202,6 +202,7 @@ def import_core_modules():
"pipecat.clocks",
"pipecat.metrics",
"pipecat.observers",
"pipecat.runner",
"pipecat.serializers",
"pipecat.sync",
"pipecat.transcriptions",

View File

@@ -26,6 +26,7 @@ Quick Links
Observers <api/pipecat.observers>
Pipeline <api/pipecat.pipeline>
Processors <api/pipecat.processors>
Runner <api/pipecat.runner>
Serializers <api/pipecat.serializers>
Services <api/pipecat.services>
Sync <api/pipecat.sync>

View File

@@ -44,6 +44,7 @@ pipecat-ai[openai]
pipecat-ai[qwen]
pipecat-ai[remote-smart-turn]
# pipecat-ai[riva] # Mocked
pipecat-ai[runner]
pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]

View File

@@ -0,0 +1,111 @@
# Client Server Web Example
Learn how to build web applications using Pipecat's client/server architecture. This approach separates your bot logic from your user interface, giving you full control over the client experience while maintaining real-time voice communication.
This example demonstrates:
- Server-side bot running with Pipecat
- React client using [Pipecat's client SDK](https://docs.pipecat.ai/client/introduction)
- Real-time voice communication between client and server
- UI components from [voice-ui-kit](https://github.com/pipecat-ai/voice-ui-kit) for common voice interface patterns
This is the recommended architecture for web applications that need custom interfaces or client-side functionality.
## Prerequisites
- Python 3.10+
- `npm` installed
- AI Service API keys for: [Deepgram](https://console.deepgram.com/signup), [OpenAI](https://auth.openai.com/create-account), and [Cartesia](https://play.cartesia.ai/sign-up)
## Setup
This example requires running both a server and client in **two separate terminal windows**.
### Terminal 1: Server Setup
1. Set up a virtual environment
From the `examples/client-server-web` directory, run:
```bash
cd server
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
2. Install dependencies
```bash
pip install -r requirements.txt
```
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
3. Configure environment variables
Create a `.env` file:
```bash
cp env.example .env
```
Then, add your API keys:
```
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
```
4. Run the example
Run your bot using:
```bash
python bot.py
```
> Using `uv`? Run your bot using: `uv run bot.py`.
> 💡 First run note: The initial startup may take ~10 seconds as Pipecat downloads required models, like the Silero VAD model.
### Terminal 2: Client Setup
1. Open a new terminal window and navigate to the `client` folder:
From the `examples/client-server-web` directory, run:
```bash
cd client
```
2. Install dependencies:
```bash
npm i
```
3. Run the client:
```bash
npm run dev
```
4. **Open http://localhost:5173 in your browser** and click `Connect` to start talking to your bot.
> 💡 **Tip**: Check your server terminal for debug logs showing Pipecat's internal workings.
## Troubleshooting
- **Browser permissions**: Make sure to allow microphone access when prompted by your browser.
- **Connection issues**: If the WebRTC connection fails, first try a different browser. If that fails, make sure you don't have a VPN or firewall rules blocking traffic. WebRTC uses UDP to communicate.
- **Audio issues**: Check that your microphone and speakers are working and not muted.
## Next Steps
- **Explore the client SDK**: Learn more about [Pipecat's client SDKs](https://docs.pipecat.ai/client/introduction) for web, mobile, and other platforms
- **Learn about the voice-ui-kit**: Explore [voice-ui-kit](https://github.com/pipecat-ai/voice-ui-kit) to simplify your front end development
- **Advanced examples**: Check out [pipecat-examples](https://github.com/pipecat-ai/pipecat-examples) for more complex client/server applications
- **Join Discord**: Connect with other developers on [Discord](https://discord.gg/pipecat)

View File

@@ -0,0 +1,24 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
node_modules
dist
dist-ssr
*.local
# Editor directories and files
.vscode/*
!.vscode/extensions.json
.idea
.DS_Store
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?

View File

@@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Pipecat UI</title>
<link rel="icon" href="/favicon.svg" type="image/svg+xml">
<link rel="stylesheet" href="/src/style.css" />
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/index.tsx"></script>
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,33 @@
{
"name": "client",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"dev": "node_modules/.bin/vite",
"build": "node_modules/.bin/tsc && vite build",
"preview": "node_modules/.bin/vite preview"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"devDependencies": {
"@types/node": "^22.14.0",
"@types/react": "^19.1.8",
"@types/react-dom": "^19.1.6",
"@vitejs/plugin-react-swc": "^3.8.1",
"typescript": "^5.8.3",
"vite": "^6.2.5"
},
"dependencies": {
"@fontsource-variable/geist": "^5.2.6",
"@fontsource-variable/geist-mono": "^5.2.6",
"@pipecat-ai/client-js": "^1.0.1",
"@pipecat-ai/client-react": "^1.0.1",
"@pipecat-ai/small-webrtc-transport": "^1.0.0",
"@pipecat-ai/daily-transport": "^1.0.0",
"@pipecat-ai/voice-ui-kit": "^0.1.0",
"react": "^19.1.0",
"react-dom": "^19.1.0"
}
}

View File

@@ -0,0 +1,7 @@
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M3.3088 5.05615C3.64682 4.92779 4.02833 5.02411 4.26653 5.29797L7.36884 8.86461H16.6312L19.7335 5.29797C19.9717 5.02411 20.3532 4.92779 20.6912 5.05615C21.0292 5.18452 21.253 5.51072 21.253 5.87504V13.75H24V15.5H19.5181V8.19909L17.6762 10.3167C17.5115 10.506 17.2738 10.6146 17.0241 10.6146H6.9759C6.72616 10.6146 6.48854 10.506 6.32383 10.3167L4.48193 8.19909V15.5H0V13.75H2.74699V5.87504C2.74699 5.51072 2.97078 5.18452 3.3088 5.05615Z" fill="black"/>
<path d="M19.5181 17.25H24V19H19.5181V17.25Z" fill="black"/>
<path d="M0 17.25H4.48193V19H0V17.25Z" fill="black"/>
<path d="M9.25301 14.3333C9.25301 14.9777 8.73517 15.5 8.09639 15.5C7.4576 15.5 6.93976 14.9777 6.93976 14.3333C6.93976 13.689 7.4576 13.1667 8.09639 13.1667C8.73517 13.1667 9.25301 13.689 9.25301 14.3333Z" fill="black"/>
<path d="M17.0602 14.3333C17.0602 14.9777 16.5424 15.5 15.9036 15.5C15.2648 15.5 14.747 14.9777 14.747 14.3333C14.747 13.689 15.2648 13.1667 15.9036 13.1667C16.5424 13.1667 17.0602 13.689 17.0602 14.3333Z" fill="black"/>
</svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,7 @@
<svg width="332" height="192" viewBox="0 0 332 192" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M45.7718 0.769635C50.4477 -0.990844 55.7252 0.330188 59.0204 4.08595L101.936 53H230.064L272.98 4.08595C276.275 0.330188 281.552 -0.990844 286.228 0.769635C290.904 2.53011 294 7.00367 294 12V120H332V144H270V43.8728L244.52 72.9141C242.242 75.5111 238.955 77 235.5 77H96.5C93.0452 77 89.7581 75.5111 87.4796 72.9141L62 43.8728V144H0V120H38V12C38 7.00367 41.0958 2.53011 45.7718 0.769635Z" fill="black"/>
<path d="M270 168H332V192H270V168Z" fill="black"/>
<path d="M0 168H62V192H0V168Z" fill="black"/>
<path d="M128 128C128 136.837 120.837 144 112 144C103.163 144 96 136.837 96 128C96 119.164 103.163 112 112 112C120.837 112 128 119.164 128 128Z" fill="black"/>
<path d="M236 128C236 136.837 228.837 144 220 144C211.163 144 204 136.837 204 128C204 119.164 211.163 112 220 112C228.837 112 236 119.164 236 128Z" fill="black"/>
</svg>

After

Width:  |  Height:  |  Size: 937 B

View File

@@ -0,0 +1,28 @@
import {
ConsoleTemplate,
FullScreenContainer,
ThemeProvider,
} from '@pipecat-ai/voice-ui-kit';
import { StrictMode } from 'react';
import { createRoot } from 'react-dom/client';
//@ts-ignore - fontsource-variable/geist is not typed
import '@fontsource-variable/geist';
//@ts-ignore - fontsource-variable/geist is not typed
import '@fontsource-variable/geist-mono';
createRoot(document.getElementById('root')!).render(
// @ts-ignore
<StrictMode>
<ThemeProvider>
<FullScreenContainer>
<ConsoleTemplate
transportType="smallwebrtc"
connectParams={{
connectionUrl: '/api/offer',
}}
/>
</FullScreenContainer>
</ThemeProvider>
</StrictMode>
);

View File

@@ -0,0 +1,7 @@
@import "@pipecat-ai/voice-ui-kit/styles.css";
html,
body {
height: 100%;
margin: 0;
}

View File

@@ -0,0 +1,27 @@
{
"compilerOptions": {
"target": "ES2016",
"module": "ESNext",
"lib": ["ES2016", "DOM", "DOM.Iterable"],
"types": ["node"],
"skipLibCheck": true,
"jsx": "preserve",
/* Bundler mode */
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"allowJs": true,
"noEmit": true,
"resolveJsonModule": true,
"isolatedModules": true,
"moduleDetection": "force",
"esModuleInterop": true,
/* Linting */
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": false,
"noFallthroughCasesInSwitch": true
},
"include": ["src"]
}

View File

@@ -0,0 +1,18 @@
import { defineConfig } from "vite";
import react from "@vitejs/plugin-react-swc";
export default defineConfig({
base: "./", //Use relative paths so it works at any mount path
plugins: [react()],
publicDir: "public",
server: {
allowedHosts: true, // Allows external connections like ngrok
proxy: {
// Proxy /api requests to the backend server
"/api": {
target: "http://0.0.0.0:7860", // Replace with your backend URL
changeOrigin: true,
},
},
},
});

View File

@@ -0,0 +1,127 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Client-Server Web Example.
This is the server-side bot implementation for the Pipecat client-server
web example. It runs a simple voice AI bot that you can connect to using a
web browser and speak with it.
Required AI services:
- Deepgram (Speech-to-Text)
- OpenAI (LLM)
- Cartesia (Text-to-Speech)
The example connects between client and server using a P2P WebRTC connection.
Run the bot using::
python bot.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point for the bot starter."""
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,3 @@
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key

View File

@@ -0,0 +1 @@
pipecat-ai[webrtc,silero,deepgram,openai,cartesia,runner]

164
examples/phone-bot-twilio/.gitignore vendored Normal file
View File

@@ -0,0 +1,164 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml
# Examples
templates/streams.xml

View File

@@ -0,0 +1,116 @@
# Phone Bot Twilio
Learn how to connect your Pipecat bot to a phone number so users can call and have voice conversations. This example shows the complete setup for telephone-based AI interactions using Twilio's telephony services. At the end, you'll be able to talk to your bot on the phone.
## Prerequisites
- Python 3.10+
- [ngrok](https://ngrok.com/docs/getting-started/) (for tunneling)
- [Twilio Account](https://www.twilio.com/login) and [phone number](https://help.twilio.com/articles/223135247-How-to-Search-for-and-Buy-a-Twilio-Phone-Number-from-Console)
- AI Service API keys for: [Deepgram](https://console.deepgram.com/signup), [OpenAI](https://auth.openai.com/create-account), and [Cartesia](https://play.cartesia.ai/sign-up)
## Setup
This example requires running both a server and ngrok tunnel in **two separate terminal windows**.
### Terminal 1: Start ngrok and Configure Twilio
1. Start ngrok:
In a new terminal, start ngrok to tunnel the local server:
```bash
ngrok http 7860
```
> Want a fixed ngrok URL? Use the `--subdomain` flag:
> `ngrok http --subdomain=your_ngrok_name 7860`
2. Update the Twilio Webhook:
- Go to your Twilio phone number's configuration page
- Under "Voice Configuration", in the "A call comes in" section:
- Select "Webhook" from the dropdown
- Enter your ngrok URL: `https://your-ngrok-url.ngrok.io`
- Ensure "HTTP POST" is selected
- Click Save at the bottom of the page
3. Configure streams.xml:
- Copy the template file to create your local version:
```bash
cp templates/streams.xml.template templates/streams.xml
```
- In `templates/streams.xml`, replace `<your_server_url>` with your ngrok URL (without `https://`)
- The final URL should look like: `wss://abc123.ngrok.io/ws`
### Terminal 2: Server Setup
1. Set up a virtual environment
From the `examples/phone-bot-twilio` directory, run:
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
2. Install dependencies
```bash
pip install -r requirements.txt
```
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
3. Configure environment variables
Create a `.env` file:
```bash
cp env.example .env
```
Then, add your API keys:
```
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
```
> Optional: Add your `TWILIO_ACCOUNT_SID` and `TWILIO_AUTH_TOKEN` to enable auto-hangup.
4. Run the Application
```bash
python server.py
```
### Test Your Phone Bot
**Call your Twilio phone number** to start talking with your AI bot! 🚀
> 💡 **Tip**: Check your server terminal for debug logs showing Pipecat's internal workings.
## Troubleshooting
- **Call doesn't connect**: Verify your ngrok URL is correctly set in both Twilio webhook and `streams.xml`
- **No audio or bot doesn't respond**: Check that all API keys are correctly set in your `.env` file
- **Webhook errors**: Ensure your server is running and ngrok tunnel is active before making calls
- **ngrok tunnel issues**: Free ngrok URLs change each restart - remember to update both Twilio and `streams.xml`
## Understanding the Call Flow
1. **Incoming Call**: User dials your Twilio number
2. **Webhook**: Twilio sends call data to your ngrok URL
3. **WebSocket**: Your server establishes real-time audio connection via Websocket and exchanges Media Streams with Twilio
4. **Processing**: Audio flows through your Pipecat Pipeline
5. **Response**: Synthesized speech streams back to caller
## Next Steps
- **Deploy to production**: Replace ngrok with a proper server deployment
- **Explore other telephony providers**: Try [Telnyx](https://github.com/pipecat-ai/pipecat-examples/tree/main/telnyx-chatbot) or [Plivo](https://github.com/pipecat-ai/pipecat-examples/tree/main/plivo-chatbot) examples
- **Advanced telephony features**: Check out [pipecat-examples](https://github.com/pipecat-ai/pipecat-examples) for call recording, transfer, and more
- **Join Discord**: Connect with other developers on [Discord](https://discord.gg/pipecat)

View File

@@ -0,0 +1,144 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Twilio Phone Example.
The example runs a simple voice AI bot that you can connect to using a
phone via Twilio.
Required AI services:
- Deepgram (Speech-to-Text)
- OpenAI (LLM)
- Cartesia (Text-to-Speech)
The example connects between client and server using a Twilio websocket
connection.
Run the bot using::
python bot.py -t twilio -x your_ngrok.ngrok.io
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import parse_telephony_websocket
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point for the bot starter."""
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
logger.info(f"Auto-detected transport: {transport_type}")
serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,3 @@
OPENAI_API_KEY=
DEEPGRAM_API_KEY=
CARTESIA_API_KEY=

View File

@@ -0,0 +1 @@
pipecat-ai[cartesia,openai,silero,deepgram,websocket,runner]

View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://<your_server_url>/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>

View File

@@ -0,0 +1,87 @@
# Pipecat Quickstart
Run your first Pipecat bot in under 5 minutes. This example creates a voice AI bot that you can talk to in your browser.
## Prerequisites
### Python 3.10+
Pipecat requires Python 3.10 or newer. Check your version:
```bash
python --version
```
If you need to upgrade Python, we recommend using a version manager like `uv` or `pyenv`.
### AI Service API keys
Pipecat orchestrates different AI services in a pipeline, ensuring low latency communication. In this quickstart example, we'll use:
- [Deepgram](https://console.deepgram.com/signup) for Speech-to-Text transcriptions
- [OpenAI](https://auth.openai.com/create-account) for LLM inference
- [Cartesia](https://play.cartesia.ai/sign-up) for Text-to-Speech audio generation
Have your API keys ready. We'll add them to your `.env` shortly.
## Setup
1. Set up a virtual environment
From the `examples/quickstart` directory, run:
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
2. Install dependencies
```bash
pip install -r requirements.txt
```
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
3. Configure environment variables
Create a `.env` file:
```bash
cp env.example .env
```
Then, add your API keys:
```
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key
```
4. Run the example
Run your bot using:
```bash
python bot.py
```
> Using `uv`? Run your bot using: `uv run bot.py`.
**Open http://localhost:7860 in your browser** and click `Connect` to start talking to your bot.
> 💡 First run note: The initial startup may take ~10 seconds as Pipecat downloads required models, like the Silero VAD model.
## Troubleshooting
- **Browser permissions**: Make sure to allow microphone access when prompted by your browser.
- **Connection issues**: If the WebRTC connection fails, first try a different browser. If that fails, make sure you don't have a VPN or firewall rules blocking traffic. WebRTC uses UDP to communicate.
- **Audio issues**: Check that your microphone and speakers are working and not muted.
## Next Steps
- **Read the docs**: Check out [Pipecat's docs](https://docs.pipecat.ai/) for guides and reference information.
- **Join Discord**: Join [Pipecat's Discord server](https://discord.gg/pipecat) to get help and learn about what others are building.

126
examples/quickstart/bot.py Normal file
View File

@@ -0,0 +1,126 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Quickstart Example.
The example runs a simple voice AI bot that you can connect to using your
browser and speak with it.
Required AI services:
- Deepgram (Speech-to-Text)
- OpenAI (LLM)
- Cartesia (Text-to-Speech)
The example connects between client and server using a P2P WebRTC connection.
Run the bot using::
python bot.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi, # RTVI processor
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point for the bot starter."""
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,3 @@
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key

View File

@@ -0,0 +1 @@
pipecat-ai[webrtc,silero,deepgram,openai,cartesia,runner]

View File

@@ -0,0 +1,218 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are:
- Daily
- SmallWebRTC
- Twilio
- Telnyx
- Plivo
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import (
DailyRunnerArguments,
RunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = None
if isinstance(runner_args, DailyRunnerArguments):
from pipecat.transports.services.daily import DailyParams, DailyTransport
if os.environ.get("ENV") != "local":
from pipecat.audio.filters.krisp_filter import KrispFilter
krisp_filter = KrispFilter()
else:
krisp_filter = None
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=DailyParams(
audio_in_enabled=True,
audio_in_filter=krisp_filter,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
elif isinstance(runner_args, WebSocketRunnerArguments):
# Use the utility to parse WebSocket data
from pipecat.runner.utils import parse_telephony_websocket
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
logger.info(f"Auto-detected transport: {transport_type}")
# Create transport based on detected type
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
elif transport_type == "telnyx":
from pipecat.serializers.telnyx import TelnyxFrameSerializer
serializer = TelnyxFrameSerializer(
stream_id=call_data["stream_id"],
call_control_id=call_data["call_control_id"],
outbound_encoding=call_data["outbound_encoding"],
inbound_encoding="PCMU", # Set manually
api_key=os.getenv("TELNYX_API_KEY", ""),
)
elif transport_type == "plivo":
from pipecat.serializers.plivo import PlivoFrameSerializer
serializer = PlivoFrameSerializer(
stream_id=call_data["stream_id"],
call_id=call_data["call_id"],
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
)
else:
# Generic fallback
serializer = None
# Create the transport
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
else:
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
return
if transport is None:
logger.error("Failed to create transport")
return
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,144 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are:
- Daily
- SmallWebRTC
- Twilio
- Telnyx
- Plivo
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# Define transport configurations using factory functions
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info("Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,157 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are Daily or SmallWebRTC.
Run it with:
- WebRTC transport::
python 02-two-transport-bot.py
- Daily transport::
python 02-two-transport-bot.py --transport daily
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import DailyRunnerArguments, RunnerArguments, SmallWebRTCRunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = None
if isinstance(runner_args, DailyRunnerArguments):
from pipecat.transports.services.daily import DailyParams, DailyTransport
if os.environ.get("ENV") != "local":
from pipecat.audio.filters.krisp_filter import KrispFilter
krisp_filter = KrispFilter()
else:
krisp_filter = None
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=DailyParams(
audio_in_enabled=True,
audio_in_filter=krisp_filter,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
else:
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
return
if transport is None:
logger.error("Failed to create transport")
return
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat development runner example.
This example has a single transport—SmallWebRTCTransport.
Run it with::
python 03-single-transport-bot.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,7 @@
FROM dailyco/pipecat-base:latest
COPY ./requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./02-two-transport-bot.py bot.py

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.1"
DOCKER_USERNAME="your_docker_username"
AGENT_NAME="cloud-simple-bot"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -0,0 +1,3 @@
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key

View File

@@ -0,0 +1,8 @@
agent_name = "cloud-simple-bot"
image = "your_dockerhub_username/cloud-simple-bot:0.1"
image_credentials = "dockerhub-access"
secret_set = "cloud-simple-bot-secrets"
enable_krisp = true
[scaling]
min_agents = 0

View File

@@ -0,0 +1 @@
pipecat-ai[openai,daily,deepgram,cartesia,silero,webrtc,websocket,runner]

View File

@@ -85,6 +85,7 @@ playht = [ "pyht>=0.1.6", "websockets>=13.1,<15.0" ]
qwen = []
rime = [ "websockets>=13.1,<15.0" ]
riva = [ "nvidia-riva-client~=2.21.1" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sambanova = []
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch~=2.5.0", "torchaudio~=2.5.0" ]

View File

@@ -0,0 +1 @@
"""Pipecat runner package for local and cloud bot execution."""

112
src/pipecat/runner/daily.py Normal file
View File

@@ -0,0 +1,112 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Daily room and token configuration utilities.
This module provides helper functions for creating and configuring Daily rooms
and authentication tokens. It handles both command-line argument parsing and
environment variable configuration.
The module supports creating temporary rooms for development or using existing
rooms specified via arguments or environment variables.
Required environment variables:
- DAILY_API_KEY - Daily API key for room/token creation
- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use
- DAILY_SAMPLE_ROOM_TOKEN (optional) - Existing token to use
Example::
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Use room_url and token with DailyTransport
"""
import argparse
import os
from typing import Optional
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
"""Configure Daily room URL and token from arguments or environment.
Args:
aiohttp_session: HTTP session for making API requests.
Returns:
Tuple containing the room URL and authentication token.
Raises:
Exception: If room URL or API key are not provided.
"""
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
"""Configure Daily room with command-line argument parsing.
Args:
aiohttp_session: HTTP session for making API requests.
parser: Optional argument parser. If None, creates a default one.
Returns:
Tuple containing room URL, authentication token, and parsed arguments.
Raises:
Exception: If room URL or API key are not provided via arguments or environment.
"""
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 2 hours in
# the future.
expiry_time: float = 2 * 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token, args)

View File

@@ -0,0 +1,148 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LiveKit room and token configuration utilities.
This module provides helper functions for creating and configuring LiveKit
rooms and authentication tokens. It handles JWT token generation with
appropriate grants for both regular participants and AI agents.
The module supports creating tokens for development and testing, with
automatic agent detection for proper room permissions.
Required environment variables:
- LIVEKIT_API_KEY - LiveKit API key
- LIVEKIT_API_SECRET - LiveKit API secret
- LIVEKIT_URL - LiveKit server URL
- LIVEKIT_ROOM_NAME - Room name to join
Example::
from pipecat.runner.livekit import configure
url, token, room_name = await configure()
# Use with LiveKitTransport
"""
import argparse
import os
from typing import Optional
from livekit import api
from loguru import logger
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
"""Generate a LiveKit access token for a participant.
Args:
room_name: Name of the LiveKit room.
participant_name: Name of the participant.
api_key: LiveKit API key.
api_secret: LiveKit API secret.
Returns:
JWT token string for room access.
"""
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
def generate_token_with_agent(
room_name: str, participant_name: str, api_key: str, api_secret: str
) -> str:
"""Generate a LiveKit access token for an agent participant.
Args:
room_name: Name of the LiveKit room.
participant_name: Name of the participant.
api_key: LiveKit API key.
api_secret: LiveKit API secret.
Returns:
JWT token string for agent room access.
"""
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
agent=True, # This makes LiveKit client know agent has joined
)
)
return token.to_jwt()
async def configure():
"""Configure LiveKit room URL and token from arguments or environment.
Returns:
Tuple containing the server URL, authentication token, and room name.
Raises:
Exception: If required LiveKit configuration is not provided.
"""
(url, token, room_name, _) = await configure_with_args()
return (url, token, room_name)
async def configure_with_args(parser: Optional[argparse.ArgumentParser] = None):
"""Configure LiveKit room with command-line argument parsing.
Args:
parser: Optional argument parser. If None, creates a default one.
Returns:
Tuple containing server URL, authentication token, room name, and parsed arguments.
Raises:
Exception: If required LiveKit configuration is not provided via arguments or environment.
"""
if not parser:
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token_with_agent(room_name, "Pipecat Agent", api_key, api_secret)
# Generate user token for testing/debugging
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name, args)

462
src/pipecat/runner/run.py Normal file
View File

@@ -0,0 +1,462 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat development runner.
This development runner executes Pipecat bots and provides the supporting
infrastructure they need - creating Daily rooms and tokens, managing WebRTC
connections, and setting up telephony webhook/WebSocket infrastructure. It
supports multiple transport types with a unified interface.
Install with::
pip install pipecat-ai[runner]
All bots must implement a `bot(runner_args)` async function as the entry point.
The server automatically discovers and executes this function when connections
are established.
Single transport example::
async def bot(runner_args: RunnerArguments):
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Bot",
DailyParams(...)
)
# Your bot logic here
await run_pipeline(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Multiple transport example::
async def bot(runner_args: RunnerArguments):
# Type-safe transport detection
if isinstance(runner_args, DailyRunnerArguments):
transport = setup_daily_transport(runner_args) # Your application code
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
transport = setup_webrtc_transport(runner_args) # Your application code
elif isinstance(runner_args, WebSocketRunnerArguments):
transport = setup_telephony_transport(runner_args) # Your application code
# Your bot implementation
await run_pipeline(transport)
Supported transports:
- Daily - Creates rooms and tokens, runs bot as participant
- WebRTC - Provides local WebRTC interface with prebuilt UI
- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo
To run locally:
- WebRTC: `python bot.py -t webrtc`
- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100`
- Daily (server): `python bot.py -t daily`
- Daily (direct, testing only): `python bot.py -d`
- Telephony: `python bot.py -t twilio -x your_username.ngrok.io`
"""
import argparse
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from typing import Dict
from loguru import logger
from pipecat.runner.types import (
DailyRunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
logger.error(f"Runner dependencies not available: {e}")
logger.error("To use Pipecat runners, install with: pip install pipecat-ai[runner]")
raise ImportError(
"Runner dependencies required. Install with: pip install pipecat-ai[runner]"
) from e
load_dotenv(override=True)
os.environ["ENV"] = "local"
def _get_bot_module():
"""Get the bot module from the calling script."""
import importlib.util
# Get the main module (the file that was executed)
main_module = sys.modules["__main__"]
# Check if it has a bot function
if hasattr(main_module, "bot"):
return main_module
# Try to import 'bot' module from current directory
try:
import bot # type: ignore[import-untyped]
return bot
except ImportError:
pass
# Look for any .py file in current directory that has a bot function
# (excluding server.py).
cwd = os.getcwd()
for filename in os.listdir(cwd):
if filename.endswith(".py") and filename != "server.py":
try:
module_name = filename[:-3] # Remove .py extension
spec = importlib.util.spec_from_file_location(
module_name, os.path.join(cwd, filename)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "bot"):
return module
except Exception:
continue
raise ImportError(
"Could not find 'bot' function. Make sure your bot file has a 'bot' function."
)
async def _run_telephony_bot(websocket: WebSocket):
"""Run a bot for telephony transports."""
bot_module = _get_bot_module()
# Just pass the WebSocket - let the bot handle parsing
runner_args = WebSocketRunnerArguments(websocket=websocket)
await bot_module.bot(runner_args)
def _create_server_app(
transport_type: str, host: str = "localhost", proxy: str = None, esp32_mode: bool = False
):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in ["twilio", "telnyx", "plivo"]:
_setup_telephony_routes(app, transport_type, proxy)
else:
logger.warning(f"Unknown transport type: {transport_type}")
return app
def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "localhost"):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
"""Handle WebRTC offer requests and manage peer connections."""
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"],
type=request["type"],
restart_pc=request.get("restart_pc", False),
)
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
"""Handle WebRTC connection closure and cleanup."""
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=pipecat_connection)
background_tasks.add_task(bot_module.bot, runner_args)
answer = pipecat_connection.get_answer()
# Apply ESP32 SDP munging if enabled
if esp32_mode and host != "localhost":
from pipecat.runner.utils import smallwebrtc_sdp_munging
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], host)
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
yield
coros = [pc.disconnect() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
app.router.lifespan_context = lifespan
def _setup_daily_routes(app: FastAPI):
"""Set up Daily-specific routes."""
@app.get("/")
async def start_agent():
"""Launch a Daily bot and redirect to room."""
print("Starting bot with Daily transport")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
asyncio.create_task(bot_module.bot(runner_args))
return RedirectResponse(room_url)
@app.post("/connect")
async def rtvi_connect():
"""Launch a Daily bot and return connection info for RTVI clients."""
print("Starting bot with Daily transport")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
asyncio.create_task(bot_module.bot(runner_args))
return {"room_url": room_url, "token": token}
def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str):
"""Set up telephony-specific routes."""
# XML response templates
XML_TEMPLATES = {
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws" bidirectionalMode="rtp"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{proxy}/ws</Stream>
</Response>""",
}
@app.post("/")
async def start_call():
"""Handle telephony webhook and return XML response."""
logger.debug(f"POST {transport_type.upper()} XML")
xml_content = XML_TEMPLATES.get(transport_type, "<Response></Response>")
return HTMLResponse(content=xml_content, media_type="application/xml")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle WebSocket connections for telephony."""
await websocket.accept()
logger.debug("WebSocket connection accepted")
await _run_telephony_bot(websocket)
@app.get("/")
async def start_agent():
"""Simple status endpoint for telephony transports."""
return {"status": f"Bot started with {transport_type}"}
async def _run_daily_direct():
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
import aiohttp
from pipecat.runner.daily import configure
except ImportError as e:
logger.error("Daily transport dependencies not installed.")
return
logger.info("Running with direct Daily connection...")
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
# Get the bot module and run it directly
bot_module = _get_bot_module()
print(f"📞 Joining Daily room: {room_url}")
print(" (Direct connection - no web server needed)")
print()
await bot_module.bot(runner_args)
def main():
"""Start the Pipecat development runner.
Parses command-line arguments and starts a FastAPI server configured
for the specified transport type. The runner will discover and run
any bot() function found in the current directory.
Command-line arguments:
Args:
--host: Server host address (default: localhost)
--port: Server port (default: 7860)
-t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo)
-x/--proxy: Public proxy hostname for telephony webhooks
--esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
-d/--direct: Connect directly to Daily room (automatically sets transport to daily)
-v/--verbose: Increase logging verbosity
The bot file must contain a `bot(runner_args)` function as the entry point.
"""
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
parser.add_argument("--host", type=str, default="localhost", help="Host address")
parser.add_argument("--port", type=int, default=7860, help="Port number")
parser.add_argument(
"-t",
"--transport",
type=str,
choices=["daily", "webrtc", "twilio", "telnyx", "plivo"],
default="webrtc",
help="Transport type",
)
parser.add_argument("--proxy", "-x", help="Public proxy host name")
parser.add_argument(
"--esp32",
action="store_true",
default=False,
help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)",
)
parser.add_argument(
"-d",
"--direct",
action="store_true",
default=False,
help="Connect directly to Daily room (automatically sets transport to daily)",
)
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
args = parser.parse_args()
# Auto-set transport to daily if --direct is used without explicit transport
if args.direct and args.transport == "webrtc": # webrtc is the default
args.transport = "daily"
elif args.direct and args.transport != "daily":
logger.error("--direct flag only works with Daily transport (-t daily)")
return
# Validate ESP32 requirements
if args.esp32 and args.host == "localhost":
logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.")
return
# Log level
logger.remove()
logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG")
# Handle direct Daily connection (no FastAPI server)
if args.direct:
print()
print("🚀 Connecting directly to Daily room...")
print()
# Run direct Daily connection
asyncio.run(_run_daily_direct())
return
# Print startup message for server-based transports
if args.transport == "webrtc":
print()
if args.esp32:
print(
f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client (ESP32 mode)"
)
else:
print(f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client")
print(f" Open this URL in your browser to connect!")
print()
elif args.transport == "daily":
print()
print(f"🚀 Daily server starting at http://{args.host}:{args.port}")
print(f" Open this URL in your browser to start a session!")
print()
# Create the app with transport-specific setup
app = _create_server_app(args.transport, args.host, args.proxy, args.esp32)
# Run the server
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Runner session argument types for the development runner.
These types are used by the development runner to pass transport-specific
information to bot functions.
"""
from dataclasses import dataclass
from typing import Any
from fastapi import WebSocket
@dataclass
class RunnerArguments:
"""Base class for runner session arguments."""
pass
@dataclass
class DailyRunnerArguments(RunnerArguments):
"""Daily transport session arguments for the runner.
Parameters:
room_url: Daily room URL to join
token: Authentication token for the room
body: Additional request data
"""
room_url: str
token: str
body: Any
@dataclass
class WebSocketRunnerArguments(RunnerArguments):
"""WebSocket transport session arguments for the runner.
Parameters:
websocket: WebSocket connection for audio streaming
"""
websocket: WebSocket
@dataclass
class SmallWebRTCRunnerArguments(RunnerArguments):
"""Small WebRTC transport session arguments for the runner.
Parameters:
webrtc_connection: Pre-configured WebRTC peer connection
"""
webrtc_connection: Any

481
src/pipecat/runner/utils.py Normal file
View File

@@ -0,0 +1,481 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Transport utility functions and FastAPI route setup helpers.
This module provides common functionality for setting up transport-specific
FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP
manipulation utilities for WebRTC compatibility and transport detection helpers.
Key features:
- WebRTC route setup with connection management
- WebSocket route setup for telephony providers
- SDP munging for ESP32 and other WebRTC compatibility
- Transport client ID detection across different transport types
- Video capture utilities for Daily transports
The utilities are designed to be transport-agnostic where possible, with
specific handlers for each transport type's unique requirements.
Example::
from pipecat.runner.utils import parse_telephony_websocket
async def telephony_websocket_handler(websocket: WebSocket):
transport_type, call_data = await parse_telephony_websocket(websocket)
"""
import json
import os
import re
from typing import Any, Callable, Dict, Optional
from fastapi import WebSocket
from loguru import logger
from pipecat.runner.types import (
DailyRunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.base_transport import BaseTransport
def _detect_transport_type_from_message(message_data: dict) -> str:
"""Attempt to auto-detect transport type from WebSocket message structure."""
logger.trace("=== Auto-Detection Analysis ===")
# Twilio detection
if (
message_data.get("event") == "start"
and "start" in message_data
and "streamSid" in message_data.get("start", {})
and "callSid" in message_data.get("start", {})
):
logger.trace("Auto-detected: TWILIO")
return "twilio"
# Telnyx detection
if (
"stream_id" in message_data
and "start" in message_data
and "call_control_id" in message_data.get("start", {})
):
logger.trace("Auto-detected: TELNYX")
return "telnyx"
# Plivo detection
if (
"start" in message_data
and "streamId" in message_data.get("start", {})
and "callId" in message_data.get("start", {})
):
logger.trace("Auto-detected: PLIVO")
return "plivo"
logger.trace("Auto-detection failed - unknown format")
return "unknown"
async def parse_telephony_websocket(websocket: WebSocket):
"""Parse telephony WebSocket messages and return transport type and call data.
Returns:
tuple: (transport_type: str, call_data: dict)
call_data contains provider-specific fields:
- Twilio: {"stream_id": str, "call_id": str}
- Telnyx: {"stream_id": str, "call_control_id": str, "outbound_encoding": str}
- Plivo: {"stream_id": str, "call_id": str}
Example usage::
transport_type, call_data = await parse_telephony_websocket(websocket)
if transport_type == "telnyx":
outbound_encoding = call_data["outbound_encoding"]
"""
# Read first two messages
start_data = websocket.iter_text()
try:
# First message
first_message_raw = await start_data.__anext__()
logger.trace(f"First message: {first_message_raw}")
try:
first_message = json.loads(first_message_raw)
except json.JSONDecodeError:
first_message = {}
# Second message
second_message_raw = await start_data.__anext__()
logger.trace(f"Second message: {second_message_raw}")
try:
second_message = json.loads(second_message_raw)
except json.JSONDecodeError:
second_message = {}
# Try auto-detection on both messages
detected_type_first = _detect_transport_type_from_message(first_message)
detected_type_second = _detect_transport_type_from_message(second_message)
# Use the successful detection
if detected_type_first != "unknown":
transport_type = detected_type_first
call_data_raw = first_message
logger.debug(f"Detected transport: {transport_type} (from first message)")
elif detected_type_second != "unknown":
transport_type = detected_type_second
call_data_raw = second_message
logger.debug(f"Detected transport: {transport_type} (from second message)")
else:
transport_type = "unknown"
call_data_raw = second_message
logger.warning("Could not auto-detect transport type")
# Extract provider-specific data
if transport_type == "twilio":
start_data = call_data_raw.get("start", {})
call_data = {
"stream_id": start_data.get("streamSid"),
"call_id": start_data.get("callSid"),
}
elif transport_type == "telnyx":
call_data = {
"stream_id": call_data_raw.get("stream_id"),
"call_control_id": call_data_raw.get("start", {}).get("call_control_id"),
"outbound_encoding": call_data_raw.get("start", {})
.get("media_format", {})
.get("encoding"),
}
elif transport_type == "plivo":
start_data = call_data_raw.get("start", {})
call_data = {
"stream_id": start_data.get("streamId"),
"call_id": start_data.get("callId"),
}
else:
call_data = {}
logger.debug(f"Parsed - Type: {transport_type}, Data: {call_data}")
return transport_type, call_data
except Exception as e:
logger.error(f"Error parsing telephony WebSocket: {e}")
raise
def get_transport_client_id(transport: BaseTransport, client: Any) -> str:
"""Get client identifier from transport-specific client object.
Args:
transport: The transport instance.
client: Transport-specific client object.
Returns:
Client identifier string, empty if transport not supported.
"""
# Import conditionally to avoid dependency issues
try:
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
return client.pc_id
except ImportError:
pass
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
return client["id"]
except ImportError:
pass
logger.warning(f"Unable to get client id from unsupported transport {type(transport)}")
return ""
async def maybe_capture_participant_camera(
transport: BaseTransport, client: Any, framerate: int = 0
):
"""Capture participant camera video if transport supports it.
Args:
transport: The transport instance.
client: Transport-specific client object.
framerate: Video capture framerate. Defaults to 0 (auto).
"""
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
await transport.capture_participant_video(
client["id"], framerate=framerate, video_source="camera"
)
except ImportError:
pass
async def maybe_capture_participant_screen(
transport: BaseTransport, client: Any, framerate: int = 0
):
"""Capture participant screen video if transport supports it.
Args:
transport: The transport instance.
client: Transport-specific client object.
framerate: Video capture framerate. Defaults to 0 (auto).
"""
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
await transport.capture_participant_video(
client["id"], framerate=framerate, video_source="screenVideo"
)
except ImportError:
pass
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
"""Clean up ICE candidates in SDP text for SmallWebRTC.
Args:
text: SDP text to clean up.
pattern: Pattern to match for candidate filtering.
Returns:
Cleaned SDP text with filtered ICE candidates.
"""
result = []
lines = text.splitlines()
for line in lines:
if re.search("a=candidate", line):
if re.search(pattern, line) and not re.search("raddr", line):
result.append(line)
else:
result.append(line)
return "\r\n".join(result)
def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
"""Remove unsupported fingerprint algorithms from SDP text.
Args:
text: SDP text to clean up.
Returns:
SDP text with sha-384 and sha-512 fingerprints removed.
"""
result = []
lines = text.splitlines()
for line in lines:
if not re.search("sha-384", line) and not re.search("sha-512", line):
result.append(line)
return "\r\n".join(result)
def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
"""Apply SDP modifications for SmallWebRTC compatibility.
Args:
sdp: Original SDP string.
host: Host address for ICE candidate filtering.
Returns:
Modified SDP string with fingerprint and ICE candidate cleanup.
"""
sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp)
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
return sdp
def _get_transport_params(transport_key: str, transport_params: Dict[str, Callable]) -> Any:
"""Get transport parameters from factory function.
Args:
transport_key: The transport key to look up
transport_params: Dict mapping transport names to parameter factory functions
Returns:
Transport parameters from the factory function
Raises:
ValueError: If transport key is missing from transport_params
"""
if transport_key not in transport_params:
raise ValueError(
f"Missing transport params for '{transport_key}'. "
f"Please add '{transport_key}' key to your transport_params dict."
)
params = transport_params[transport_key]()
logger.debug(f"Using transport params for {transport_key}")
return params
async def _create_telephony_transport(
websocket: WebSocket,
params: Optional[Any] = None,
transport_type: str = None,
call_data: dict = None,
) -> BaseTransport:
"""Create a telephony transport with pre-parsed WebSocket data.
Args:
websocket: FastAPI WebSocket connection from telephony provider
params: FastAPIWebsocketParams (required)
transport_type: Pre-detected provider type ("twilio", "telnyx", "plivo")
call_data: Pre-parsed call data dict with provider-specific fields
Returns:
Configured FastAPIWebsocketTransport ready for telephony use.
"""
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport
if params is None:
raise ValueError(
"FastAPIWebsocketParams must be provided. "
"The serializer and add_wav_header will be set automatically."
)
# Always set add_wav_header to False for telephony
params.add_wav_header = False
logger.info(f"Using pre-detected telephony provider: {transport_type}")
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
params.serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
elif transport_type == "telnyx":
from pipecat.serializers.telnyx import TelnyxFrameSerializer
params.serializer = TelnyxFrameSerializer(
stream_id=call_data["stream_id"],
call_control_id=call_data["call_control_id"],
outbound_encoding=call_data["outbound_encoding"],
inbound_encoding="PCMU", # Standard default
api_key=os.getenv("TELNYX_API_KEY", ""),
)
elif transport_type == "plivo":
from pipecat.serializers.plivo import PlivoFrameSerializer
params.serializer = PlivoFrameSerializer(
stream_id=call_data["stream_id"],
call_id=call_data["call_id"],
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
)
else:
raise ValueError(
f"Unsupported telephony provider: {transport_type}. "
f"Supported providers: twilio, telnyx, plivo"
)
return FastAPIWebsocketTransport(websocket=websocket, params=params)
async def create_transport(
runner_args: Any, transport_params: Dict[str, Callable]
) -> BaseTransport:
"""Create a transport from runner arguments using factory functions.
This function uses the clean transport_params factory pattern where users
define a dictionary mapping transport names to parameter factory functions.
Args:
runner_args: Arguments from the runner.
transport_params: Dict mapping transport names to parameter factory functions.
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo"
Values should be functions that return transport parameters when called.
Returns:
Configured transport instance.
Raises:
ValueError: If transport key is missing from transport_params or runner_args type is unsupported.
ImportError: If required dependencies are not installed.
Example::
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}
transport = await create_transport(runner_args, transport_params)
"""
# Create transport based on runner args type
if isinstance(runner_args, DailyRunnerArguments):
params = _get_transport_params("daily", transport_params)
from pipecat.transports.services.daily import DailyTransport
return DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=params,
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
params = _get_transport_params("webrtc", transport_params)
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
return SmallWebRTCTransport(
params=params,
webrtc_connection=runner_args.webrtc_connection,
)
elif isinstance(runner_args, WebSocketRunnerArguments):
# Parse once to determine the provider and get data
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
params = _get_transport_params(transport_type, transport_params)
# Create telephony transport with pre-parsed data
return await _create_telephony_transport(
runner_args.websocket, params, transport_type, call_data
)
else:
raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")