Compare commits
49 Commits
aleix/read
...
mb/new-qui
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
027ab8fedf | ||
|
|
dbf9899de8 | ||
|
|
a22bebd174 | ||
|
|
3ab9c15583 | ||
|
|
eca366bfac | ||
|
|
c5483411f2 | ||
|
|
ee514f6e4c | ||
|
|
90487ac144 | ||
|
|
54f0bb8326 | ||
|
|
f35a58abf1 | ||
|
|
e33ca26e2d | ||
|
|
471311b18f | ||
|
|
0c3e526c19 | ||
|
|
58fc952192 | ||
|
|
46c520bb36 | ||
|
|
5b6e25a7e0 | ||
|
|
f133cf97c8 | ||
|
|
dba182b396 | ||
|
|
32c7457734 | ||
|
|
c89422f2f2 | ||
|
|
66b4bbec1a | ||
|
|
b125d31088 | ||
|
|
980e52e72e | ||
|
|
e2cfa45cc0 | ||
|
|
061de9cbaf | ||
|
|
f9e316686f | ||
|
|
6b194a2954 | ||
|
|
83a88d7c85 | ||
|
|
f5e23c36a4 | ||
|
|
155817a1fe | ||
|
|
16c80b2335 | ||
|
|
49af1553e0 | ||
|
|
6dad4de2d2 | ||
|
|
214c376933 | ||
|
|
7db57109dc | ||
|
|
9fd3e466ab | ||
|
|
ccd71cfafb | ||
|
|
eee8cf35b2 | ||
|
|
acbc045d47 | ||
|
|
579eaf0889 | ||
|
|
1284c24144 | ||
|
|
ad84a5af46 | ||
|
|
8138c6ceab | ||
|
|
112ae23f4d | ||
|
|
fa6c9c35f7 | ||
|
|
7288d9b738 | ||
|
|
a494bd7a61 | ||
|
|
5532655eb9 | ||
|
|
be1d8041e2 |
@@ -1,5 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Check if sphinx-build is installed
|
||||
if ! command -v sphinx-build &> /dev/null; then
|
||||
echo "Error: sphinx-build is not installed or not in PATH" >&2
|
||||
echo "Please install Sphinx using: pip install -r requirements.txt" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Clean previous build
|
||||
rm -rf _build
|
||||
|
||||
|
||||
@@ -202,6 +202,7 @@ def import_core_modules():
|
||||
"pipecat.clocks",
|
||||
"pipecat.metrics",
|
||||
"pipecat.observers",
|
||||
"pipecat.runner",
|
||||
"pipecat.serializers",
|
||||
"pipecat.sync",
|
||||
"pipecat.transcriptions",
|
||||
|
||||
@@ -26,6 +26,7 @@ Quick Links
|
||||
Observers <api/pipecat.observers>
|
||||
Pipeline <api/pipecat.pipeline>
|
||||
Processors <api/pipecat.processors>
|
||||
Runner <api/pipecat.runner>
|
||||
Serializers <api/pipecat.serializers>
|
||||
Services <api/pipecat.services>
|
||||
Sync <api/pipecat.sync>
|
||||
|
||||
@@ -44,6 +44,7 @@ pipecat-ai[openai]
|
||||
pipecat-ai[qwen]
|
||||
pipecat-ai[remote-smart-turn]
|
||||
# pipecat-ai[riva] # Mocked
|
||||
pipecat-ai[runner]
|
||||
pipecat-ai[sambanova]
|
||||
pipecat-ai[silero]
|
||||
pipecat-ai[simli]
|
||||
|
||||
111
examples/client-server-web/README.md
Normal file
111
examples/client-server-web/README.md
Normal file
@@ -0,0 +1,111 @@
|
||||
# Client Server Web Example
|
||||
|
||||
Learn how to build web applications using Pipecat's client/server architecture. This approach separates your bot logic from your user interface, giving you full control over the client experience while maintaining real-time voice communication.
|
||||
|
||||
This example demonstrates:
|
||||
|
||||
- Server-side bot running with Pipecat
|
||||
- React client using [Pipecat's client SDK](https://docs.pipecat.ai/client/introduction)
|
||||
- Real-time voice communication between client and server
|
||||
- UI components from [voice-ui-kit](https://github.com/pipecat-ai/voice-ui-kit) for common voice interface patterns
|
||||
|
||||
This is the recommended architecture for web applications that need custom interfaces or client-side functionality.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.10+
|
||||
- `npm` installed
|
||||
- AI Service API keys for: [Deepgram](https://console.deepgram.com/signup), [OpenAI](https://auth.openai.com/create-account), and [Cartesia](https://play.cartesia.ai/sign-up)
|
||||
|
||||
## Setup
|
||||
|
||||
This example requires running both a server and client in **two separate terminal windows**.
|
||||
|
||||
### Terminal 1: Server Setup
|
||||
|
||||
1. Set up a virtual environment
|
||||
|
||||
From the `examples/client-server-web` directory, run:
|
||||
|
||||
```bash
|
||||
cd server
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate # On Windows: .venv\Scripts\activate
|
||||
```
|
||||
|
||||
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
|
||||
|
||||
2. Install dependencies
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
|
||||
|
||||
3. Configure environment variables
|
||||
|
||||
Create a `.env` file:
|
||||
|
||||
```bash
|
||||
cp env.example .env
|
||||
```
|
||||
|
||||
Then, add your API keys:
|
||||
|
||||
```
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
```
|
||||
|
||||
4. Run the example
|
||||
|
||||
Run your bot using:
|
||||
|
||||
```bash
|
||||
python bot.py
|
||||
```
|
||||
|
||||
> Using `uv`? Run your bot using: `uv run bot.py`.
|
||||
|
||||
> 💡 First run note: The initial startup may take ~10 seconds as Pipecat downloads required models, like the Silero VAD model.
|
||||
|
||||
### Terminal 2: Client Setup
|
||||
|
||||
1. Open a new terminal window and navigate to the `client` folder:
|
||||
|
||||
From the `examples/client-server-web` directory, run:
|
||||
|
||||
```bash
|
||||
cd client
|
||||
```
|
||||
|
||||
2. Install dependencies:
|
||||
|
||||
```bash
|
||||
npm i
|
||||
```
|
||||
|
||||
3. Run the client:
|
||||
|
||||
```bash
|
||||
npm run dev
|
||||
```
|
||||
|
||||
4. **Open http://localhost:5173 in your browser** and click `Connect` to start talking to your bot.
|
||||
|
||||
> 💡 **Tip**: Check your server terminal for debug logs showing Pipecat's internal workings.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- **Browser permissions**: Make sure to allow microphone access when prompted by your browser.
|
||||
- **Connection issues**: If the WebRTC connection fails, first try a different browser. If that fails, make sure you don't have a VPN or firewall rules blocking traffic. WebRTC uses UDP to communicate.
|
||||
- **Audio issues**: Check that your microphone and speakers are working and not muted.
|
||||
|
||||
## Next Steps
|
||||
|
||||
- **Explore the client SDK**: Learn more about [Pipecat's client SDKs](https://docs.pipecat.ai/client/introduction) for web, mobile, and other platforms
|
||||
- **Learn about the voice-ui-kit**: Explore [voice-ui-kit](https://github.com/pipecat-ai/voice-ui-kit) to simplify your front end development
|
||||
- **Advanced examples**: Check out [pipecat-examples](https://github.com/pipecat-ai/pipecat-examples) for more complex client/server applications
|
||||
- **Join Discord**: Connect with other developers on [Discord](https://discord.gg/pipecat)
|
||||
24
examples/client-server-web/client/.gitignore
vendored
Normal file
24
examples/client-server-web/client/.gitignore
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
# Logs
|
||||
logs
|
||||
*.log
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
||||
pnpm-debug.log*
|
||||
lerna-debug.log*
|
||||
|
||||
node_modules
|
||||
dist
|
||||
dist-ssr
|
||||
*.local
|
||||
|
||||
# Editor directories and files
|
||||
.vscode/*
|
||||
!.vscode/extensions.json
|
||||
.idea
|
||||
.DS_Store
|
||||
*.suo
|
||||
*.ntvs*
|
||||
*.njsproj
|
||||
*.sln
|
||||
*.sw?
|
||||
17
examples/client-server-web/client/index.html
Normal file
17
examples/client-server-web/client/index.html
Normal file
@@ -0,0 +1,17 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<title>Pipecat UI</title>
|
||||
<link rel="icon" href="/favicon.svg" type="image/svg+xml">
|
||||
<link rel="stylesheet" href="/src/style.css" />
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div id="root"></div>
|
||||
<script type="module" src="/src/index.tsx"></script>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
2635
examples/client-server-web/client/package-lock.json
generated
Normal file
2635
examples/client-server-web/client/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
33
examples/client-server-web/client/package.json
Normal file
33
examples/client-server-web/client/package.json
Normal file
@@ -0,0 +1,33 @@
|
||||
{
|
||||
"name": "client",
|
||||
"version": "1.0.0",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"dev": "node_modules/.bin/vite",
|
||||
"build": "node_modules/.bin/tsc && vite build",
|
||||
"preview": "node_modules/.bin/vite preview"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"description": "",
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.14.0",
|
||||
"@types/react": "^19.1.8",
|
||||
"@types/react-dom": "^19.1.6",
|
||||
"@vitejs/plugin-react-swc": "^3.8.1",
|
||||
"typescript": "^5.8.3",
|
||||
"vite": "^6.2.5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@fontsource-variable/geist": "^5.2.6",
|
||||
"@fontsource-variable/geist-mono": "^5.2.6",
|
||||
"@pipecat-ai/client-js": "^1.0.1",
|
||||
"@pipecat-ai/client-react": "^1.0.1",
|
||||
"@pipecat-ai/small-webrtc-transport": "^1.0.0",
|
||||
"@pipecat-ai/daily-transport": "^1.0.0",
|
||||
"@pipecat-ai/voice-ui-kit": "^0.1.0",
|
||||
"react": "^19.1.0",
|
||||
"react-dom": "^19.1.0"
|
||||
}
|
||||
}
|
||||
7
examples/client-server-web/client/public/favicon.svg
Normal file
7
examples/client-server-web/client/public/favicon.svg
Normal file
@@ -0,0 +1,7 @@
|
||||
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M3.3088 5.05615C3.64682 4.92779 4.02833 5.02411 4.26653 5.29797L7.36884 8.86461H16.6312L19.7335 5.29797C19.9717 5.02411 20.3532 4.92779 20.6912 5.05615C21.0292 5.18452 21.253 5.51072 21.253 5.87504V13.75H24V15.5H19.5181V8.19909L17.6762 10.3167C17.5115 10.506 17.2738 10.6146 17.0241 10.6146H6.9759C6.72616 10.6146 6.48854 10.506 6.32383 10.3167L4.48193 8.19909V15.5H0V13.75H2.74699V5.87504C2.74699 5.51072 2.97078 5.18452 3.3088 5.05615Z" fill="black"/>
|
||||
<path d="M19.5181 17.25H24V19H19.5181V17.25Z" fill="black"/>
|
||||
<path d="M0 17.25H4.48193V19H0V17.25Z" fill="black"/>
|
||||
<path d="M9.25301 14.3333C9.25301 14.9777 8.73517 15.5 8.09639 15.5C7.4576 15.5 6.93976 14.9777 6.93976 14.3333C6.93976 13.689 7.4576 13.1667 8.09639 13.1667C8.73517 13.1667 9.25301 13.689 9.25301 14.3333Z" fill="black"/>
|
||||
<path d="M17.0602 14.3333C17.0602 14.9777 16.5424 15.5 15.9036 15.5C15.2648 15.5 14.747 14.9777 14.747 14.3333C14.747 13.689 15.2648 13.1667 15.9036 13.1667C16.5424 13.1667 17.0602 13.689 17.0602 14.3333Z" fill="black"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 1.1 KiB |
@@ -0,0 +1,7 @@
|
||||
<svg width="332" height="192" viewBox="0 0 332 192" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M45.7718 0.769635C50.4477 -0.990844 55.7252 0.330188 59.0204 4.08595L101.936 53H230.064L272.98 4.08595C276.275 0.330188 281.552 -0.990844 286.228 0.769635C290.904 2.53011 294 7.00367 294 12V120H332V144H270V43.8728L244.52 72.9141C242.242 75.5111 238.955 77 235.5 77H96.5C93.0452 77 89.7581 75.5111 87.4796 72.9141L62 43.8728V144H0V120H38V12C38 7.00367 41.0958 2.53011 45.7718 0.769635Z" fill="black"/>
|
||||
<path d="M270 168H332V192H270V168Z" fill="black"/>
|
||||
<path d="M0 168H62V192H0V168Z" fill="black"/>
|
||||
<path d="M128 128C128 136.837 120.837 144 112 144C103.163 144 96 136.837 96 128C96 119.164 103.163 112 112 112C120.837 112 128 119.164 128 128Z" fill="black"/>
|
||||
<path d="M236 128C236 136.837 228.837 144 220 144C211.163 144 204 136.837 204 128C204 119.164 211.163 112 220 112C228.837 112 236 119.164 236 128Z" fill="black"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 937 B |
28
examples/client-server-web/client/src/index.tsx
Normal file
28
examples/client-server-web/client/src/index.tsx
Normal file
@@ -0,0 +1,28 @@
|
||||
import {
|
||||
ConsoleTemplate,
|
||||
FullScreenContainer,
|
||||
ThemeProvider,
|
||||
} from '@pipecat-ai/voice-ui-kit';
|
||||
import { StrictMode } from 'react';
|
||||
import { createRoot } from 'react-dom/client';
|
||||
|
||||
//@ts-ignore - fontsource-variable/geist is not typed
|
||||
import '@fontsource-variable/geist';
|
||||
//@ts-ignore - fontsource-variable/geist is not typed
|
||||
import '@fontsource-variable/geist-mono';
|
||||
|
||||
createRoot(document.getElementById('root')!).render(
|
||||
// @ts-ignore
|
||||
<StrictMode>
|
||||
<ThemeProvider>
|
||||
<FullScreenContainer>
|
||||
<ConsoleTemplate
|
||||
transportType="smallwebrtc"
|
||||
connectParams={{
|
||||
connectionUrl: '/api/offer',
|
||||
}}
|
||||
/>
|
||||
</FullScreenContainer>
|
||||
</ThemeProvider>
|
||||
</StrictMode>
|
||||
);
|
||||
7
examples/client-server-web/client/src/style.css
Normal file
7
examples/client-server-web/client/src/style.css
Normal file
@@ -0,0 +1,7 @@
|
||||
@import "@pipecat-ai/voice-ui-kit/styles.css";
|
||||
|
||||
html,
|
||||
body {
|
||||
height: 100%;
|
||||
margin: 0;
|
||||
}
|
||||
27
examples/client-server-web/client/tsconfig.json
Normal file
27
examples/client-server-web/client/tsconfig.json
Normal file
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2016",
|
||||
"module": "ESNext",
|
||||
"lib": ["ES2016", "DOM", "DOM.Iterable"],
|
||||
"types": ["node"],
|
||||
"skipLibCheck": true,
|
||||
"jsx": "preserve",
|
||||
|
||||
/* Bundler mode */
|
||||
"moduleResolution": "bundler",
|
||||
"allowImportingTsExtensions": true,
|
||||
"allowJs": true,
|
||||
"noEmit": true,
|
||||
"resolveJsonModule": true,
|
||||
"isolatedModules": true,
|
||||
"moduleDetection": "force",
|
||||
"esModuleInterop": true,
|
||||
|
||||
/* Linting */
|
||||
"strict": true,
|
||||
"noUnusedLocals": true,
|
||||
"noUnusedParameters": false,
|
||||
"noFallthroughCasesInSwitch": true
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
18
examples/client-server-web/client/vite.config.js
Normal file
18
examples/client-server-web/client/vite.config.js
Normal file
@@ -0,0 +1,18 @@
|
||||
import { defineConfig } from "vite";
|
||||
import react from "@vitejs/plugin-react-swc";
|
||||
|
||||
export default defineConfig({
|
||||
base: "./", //Use relative paths so it works at any mount path
|
||||
plugins: [react()],
|
||||
publicDir: "public",
|
||||
server: {
|
||||
allowedHosts: true, // Allows external connections like ngrok
|
||||
proxy: {
|
||||
// Proxy /api requests to the backend server
|
||||
"/api": {
|
||||
target: "http://0.0.0.0:7860", // Replace with your backend URL
|
||||
changeOrigin: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
127
examples/client-server-web/server/bot.py
Normal file
127
examples/client-server-web/server/bot.py
Normal file
@@ -0,0 +1,127 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Client-Server Web Example.
|
||||
|
||||
This is the server-side bot implementation for the Pipecat client-server
|
||||
web example. It runs a simple voice AI bot that you can connect to using a
|
||||
web browser and speak with it.
|
||||
|
||||
Required AI services:
|
||||
- Deepgram (Speech-to-Text)
|
||||
- OpenAI (LLM)
|
||||
- Cartesia (Text-to-Speech)
|
||||
|
||||
The example connects between client and server using a P2P WebRTC connection.
|
||||
|
||||
Run the bot using::
|
||||
|
||||
python bot.py
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi, # RTVI processor
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point for the bot starter."""
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
3
examples/client-server-web/server/env.example
Normal file
3
examples/client-server-web/server/env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
1
examples/client-server-web/server/requirements.txt
Normal file
1
examples/client-server-web/server/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pipecat-ai[webrtc,silero,deepgram,openai,cartesia,runner]
|
||||
164
examples/phone-bot-twilio/.gitignore
vendored
Normal file
164
examples/phone-bot-twilio/.gitignore
vendored
Normal file
@@ -0,0 +1,164 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
runpod.toml
|
||||
|
||||
# Examples
|
||||
templates/streams.xml
|
||||
116
examples/phone-bot-twilio/README.md
Normal file
116
examples/phone-bot-twilio/README.md
Normal file
@@ -0,0 +1,116 @@
|
||||
# Phone Bot Twilio
|
||||
|
||||
Learn how to connect your Pipecat bot to a phone number so users can call and have voice conversations. This example shows the complete setup for telephone-based AI interactions using Twilio's telephony services. At the end, you'll be able to talk to your bot on the phone.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.10+
|
||||
- [ngrok](https://ngrok.com/docs/getting-started/) (for tunneling)
|
||||
- [Twilio Account](https://www.twilio.com/login) and [phone number](https://help.twilio.com/articles/223135247-How-to-Search-for-and-Buy-a-Twilio-Phone-Number-from-Console)
|
||||
- AI Service API keys for: [Deepgram](https://console.deepgram.com/signup), [OpenAI](https://auth.openai.com/create-account), and [Cartesia](https://play.cartesia.ai/sign-up)
|
||||
|
||||
## Setup
|
||||
|
||||
This example requires running both a server and ngrok tunnel in **two separate terminal windows**.
|
||||
|
||||
### Terminal 1: Start ngrok and Configure Twilio
|
||||
|
||||
1. Start ngrok:
|
||||
In a new terminal, start ngrok to tunnel the local server:
|
||||
|
||||
```bash
|
||||
ngrok http 7860
|
||||
```
|
||||
|
||||
> Want a fixed ngrok URL? Use the `--subdomain` flag:
|
||||
> `ngrok http --subdomain=your_ngrok_name 7860`
|
||||
|
||||
2. Update the Twilio Webhook:
|
||||
|
||||
- Go to your Twilio phone number's configuration page
|
||||
- Under "Voice Configuration", in the "A call comes in" section:
|
||||
- Select "Webhook" from the dropdown
|
||||
- Enter your ngrok URL: `https://your-ngrok-url.ngrok.io`
|
||||
- Ensure "HTTP POST" is selected
|
||||
- Click Save at the bottom of the page
|
||||
|
||||
3. Configure streams.xml:
|
||||
- Copy the template file to create your local version:
|
||||
```bash
|
||||
cp templates/streams.xml.template templates/streams.xml
|
||||
```
|
||||
- In `templates/streams.xml`, replace `<your_server_url>` with your ngrok URL (without `https://`)
|
||||
- The final URL should look like: `wss://abc123.ngrok.io/ws`
|
||||
|
||||
### Terminal 2: Server Setup
|
||||
|
||||
1. Set up a virtual environment
|
||||
|
||||
From the `examples/phone-bot-twilio` directory, run:
|
||||
|
||||
```bash
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate # On Windows: .venv\Scripts\activate
|
||||
```
|
||||
|
||||
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
|
||||
|
||||
2. Install dependencies
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
|
||||
|
||||
3. Configure environment variables
|
||||
|
||||
Create a `.env` file:
|
||||
|
||||
```bash
|
||||
cp env.example .env
|
||||
```
|
||||
|
||||
Then, add your API keys:
|
||||
|
||||
```
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
```
|
||||
|
||||
> Optional: Add your `TWILIO_ACCOUNT_SID` and `TWILIO_AUTH_TOKEN` to enable auto-hangup.
|
||||
|
||||
4. Run the Application
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
### Test Your Phone Bot
|
||||
|
||||
**Call your Twilio phone number** to start talking with your AI bot! 🚀
|
||||
|
||||
> 💡 **Tip**: Check your server terminal for debug logs showing Pipecat's internal workings.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- **Call doesn't connect**: Verify your ngrok URL is correctly set in both Twilio webhook and `streams.xml`
|
||||
- **No audio or bot doesn't respond**: Check that all API keys are correctly set in your `.env` file
|
||||
- **Webhook errors**: Ensure your server is running and ngrok tunnel is active before making calls
|
||||
- **ngrok tunnel issues**: Free ngrok URLs change each restart - remember to update both Twilio and `streams.xml`
|
||||
|
||||
## Understanding the Call Flow
|
||||
|
||||
1. **Incoming Call**: User dials your Twilio number
|
||||
2. **Webhook**: Twilio sends call data to your ngrok URL
|
||||
3. **WebSocket**: Your server establishes real-time audio connection via Websocket and exchanges Media Streams with Twilio
|
||||
4. **Processing**: Audio flows through your Pipecat Pipeline
|
||||
5. **Response**: Synthesized speech streams back to caller
|
||||
|
||||
## Next Steps
|
||||
|
||||
- **Deploy to production**: Replace ngrok with a proper server deployment
|
||||
- **Explore other telephony providers**: Try [Telnyx](https://github.com/pipecat-ai/pipecat-examples/tree/main/telnyx-chatbot) or [Plivo](https://github.com/pipecat-ai/pipecat-examples/tree/main/plivo-chatbot) examples
|
||||
- **Advanced telephony features**: Check out [pipecat-examples](https://github.com/pipecat-ai/pipecat-examples) for call recording, transfer, and more
|
||||
- **Join Discord**: Connect with other developers on [Discord](https://discord.gg/pipecat)
|
||||
144
examples/phone-bot-twilio/bot.py
Normal file
144
examples/phone-bot-twilio/bot.py
Normal file
@@ -0,0 +1,144 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat Twilio Phone Example.
|
||||
|
||||
The example runs a simple voice AI bot that you can connect to using a
|
||||
phone via Twilio.
|
||||
|
||||
Required AI services:
|
||||
- Deepgram (Speech-to-Text)
|
||||
- OpenAI (LLM)
|
||||
- Cartesia (Text-to-Speech)
|
||||
|
||||
The example connects between client and server using a Twilio websocket
|
||||
connection.
|
||||
|
||||
Run the bot using::
|
||||
|
||||
python bot.py -t twilio -x your_ngrok.ngrok.io
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import parse_telephony_websocket
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
FastAPIWebsocketTransport,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi, # RTVI processor
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point for the bot starter."""
|
||||
|
||||
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
|
||||
logger.info(f"Auto-detected transport: {transport_type}")
|
||||
|
||||
serializer = TwilioFrameSerializer(
|
||||
stream_sid=call_data["stream_id"],
|
||||
call_sid=call_data["call_id"],
|
||||
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
|
||||
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
|
||||
)
|
||||
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=runner_args.websocket,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
serializer=serializer,
|
||||
),
|
||||
)
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
3
examples/phone-bot-twilio/env.example
Normal file
3
examples/phone-bot-twilio/env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
OPENAI_API_KEY=
|
||||
DEEPGRAM_API_KEY=
|
||||
CARTESIA_API_KEY=
|
||||
1
examples/phone-bot-twilio/requirements.txt
Normal file
1
examples/phone-bot-twilio/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pipecat-ai[cartesia,openai,silero,deepgram,websocket,runner]
|
||||
7
examples/phone-bot-twilio/templates/streams.xml.template
Normal file
7
examples/phone-bot-twilio/templates/streams.xml.template
Normal file
@@ -0,0 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://<your_server_url>/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>
|
||||
87
examples/quickstart/README.md
Normal file
87
examples/quickstart/README.md
Normal file
@@ -0,0 +1,87 @@
|
||||
# Pipecat Quickstart
|
||||
|
||||
Run your first Pipecat bot in under 5 minutes. This example creates a voice AI bot that you can talk to in your browser.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### Python 3.10+
|
||||
|
||||
Pipecat requires Python 3.10 or newer. Check your version:
|
||||
|
||||
```bash
|
||||
python --version
|
||||
```
|
||||
|
||||
If you need to upgrade Python, we recommend using a version manager like `uv` or `pyenv`.
|
||||
|
||||
### AI Service API keys
|
||||
|
||||
Pipecat orchestrates different AI services in a pipeline, ensuring low latency communication. In this quickstart example, we'll use:
|
||||
|
||||
- [Deepgram](https://console.deepgram.com/signup) for Speech-to-Text transcriptions
|
||||
- [OpenAI](https://auth.openai.com/create-account) for LLM inference
|
||||
- [Cartesia](https://play.cartesia.ai/sign-up) for Text-to-Speech audio generation
|
||||
|
||||
Have your API keys ready. We'll add them to your `.env` shortly.
|
||||
|
||||
## Setup
|
||||
|
||||
1. Set up a virtual environment
|
||||
|
||||
From the `examples/quickstart` directory, run:
|
||||
|
||||
```bash
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate # On Windows: .venv\Scripts\activate
|
||||
```
|
||||
|
||||
> Using `uv`? Create your venv using: `uv venv && source .venv/bin/activate`.
|
||||
|
||||
2. Install dependencies
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
> Using `uv`? Install requirements using: `uv pip install -r requirements.txt`.
|
||||
|
||||
3. Configure environment variables
|
||||
|
||||
Create a `.env` file:
|
||||
|
||||
```bash
|
||||
cp env.example .env
|
||||
```
|
||||
|
||||
Then, add your API keys:
|
||||
|
||||
```
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
```
|
||||
|
||||
4. Run the example
|
||||
|
||||
Run your bot using:
|
||||
|
||||
```bash
|
||||
python bot.py
|
||||
```
|
||||
|
||||
> Using `uv`? Run your bot using: `uv run bot.py`.
|
||||
|
||||
**Open http://localhost:7860 in your browser** and click `Connect` to start talking to your bot.
|
||||
|
||||
> 💡 First run note: The initial startup may take ~10 seconds as Pipecat downloads required models, like the Silero VAD model.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- **Browser permissions**: Make sure to allow microphone access when prompted by your browser.
|
||||
- **Connection issues**: If the WebRTC connection fails, first try a different browser. If that fails, make sure you don't have a VPN or firewall rules blocking traffic. WebRTC uses UDP to communicate.
|
||||
- **Audio issues**: Check that your microphone and speakers are working and not muted.
|
||||
|
||||
## Next Steps
|
||||
|
||||
- **Read the docs**: Check out [Pipecat's docs](https://docs.pipecat.ai/) for guides and reference information.
|
||||
- **Join Discord**: Join [Pipecat's Discord server](https://discord.gg/pipecat) to get help and learn about what others are building.
|
||||
126
examples/quickstart/bot.py
Normal file
126
examples/quickstart/bot.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat Quickstart Example.
|
||||
|
||||
The example runs a simple voice AI bot that you can connect to using your
|
||||
browser and speak with it.
|
||||
|
||||
Required AI services:
|
||||
- Deepgram (Speech-to-Text)
|
||||
- OpenAI (LLM)
|
||||
- Cartesia (Text-to-Speech)
|
||||
|
||||
The example connects between client and server using a P2P WebRTC connection.
|
||||
|
||||
Run the bot using::
|
||||
|
||||
python bot.py
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
rtvi, # RTVI processor
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point for the bot starter."""
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
3
examples/quickstart/env.example
Normal file
3
examples/quickstart/env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
1
examples/quickstart/requirements.txt
Normal file
1
examples/quickstart/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pipecat-ai[webrtc,silero,deepgram,openai,cartesia,runner]
|
||||
218
examples/runner-examples/01-all-transport-bot.py
Normal file
218
examples/runner-examples/01-all-transport-bot.py
Normal file
@@ -0,0 +1,218 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat Cloud-compatible bot example.
|
||||
|
||||
Transports are:
|
||||
|
||||
- Daily
|
||||
- SmallWebRTC
|
||||
- Twilio
|
||||
- Telnyx
|
||||
- Plivo
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import (
|
||||
DailyRunnerArguments,
|
||||
RunnerArguments,
|
||||
SmallWebRTCRunnerArguments,
|
||||
WebSocketRunnerArguments,
|
||||
)
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
"""Main bot logic that works with any transport."""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
|
||||
transport = None
|
||||
|
||||
if isinstance(runner_args, DailyRunnerArguments):
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
if os.environ.get("ENV") != "local":
|
||||
from pipecat.audio.filters.krisp_filter import KrispFilter
|
||||
|
||||
krisp_filter = KrispFilter()
|
||||
else:
|
||||
krisp_filter = None
|
||||
|
||||
transport = DailyTransport(
|
||||
runner_args.room_url,
|
||||
runner_args.token,
|
||||
"Pipecat Bot",
|
||||
params=DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_filter=krisp_filter,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, WebSocketRunnerArguments):
|
||||
# Use the utility to parse WebSocket data
|
||||
from pipecat.runner.utils import parse_telephony_websocket
|
||||
|
||||
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
|
||||
logger.info(f"Auto-detected transport: {transport_type}")
|
||||
|
||||
# Create transport based on detected type
|
||||
if transport_type == "twilio":
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
|
||||
serializer = TwilioFrameSerializer(
|
||||
stream_sid=call_data["stream_id"],
|
||||
call_sid=call_data["call_id"],
|
||||
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
|
||||
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
|
||||
)
|
||||
|
||||
elif transport_type == "telnyx":
|
||||
from pipecat.serializers.telnyx import TelnyxFrameSerializer
|
||||
|
||||
serializer = TelnyxFrameSerializer(
|
||||
stream_id=call_data["stream_id"],
|
||||
call_control_id=call_data["call_control_id"],
|
||||
outbound_encoding=call_data["outbound_encoding"],
|
||||
inbound_encoding="PCMU", # Set manually
|
||||
api_key=os.getenv("TELNYX_API_KEY", ""),
|
||||
)
|
||||
|
||||
elif transport_type == "plivo":
|
||||
from pipecat.serializers.plivo import PlivoFrameSerializer
|
||||
|
||||
serializer = PlivoFrameSerializer(
|
||||
stream_id=call_data["stream_id"],
|
||||
call_id=call_data["call_id"],
|
||||
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
|
||||
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
|
||||
)
|
||||
|
||||
else:
|
||||
# Generic fallback
|
||||
serializer = None
|
||||
|
||||
# Create the transport
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
FastAPIWebsocketTransport,
|
||||
)
|
||||
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=runner_args.websocket,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
serializer=serializer,
|
||||
),
|
||||
)
|
||||
else:
|
||||
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
|
||||
return
|
||||
|
||||
if transport is None:
|
||||
logger.error("Failed to create transport")
|
||||
return
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
144
examples/runner-examples/01-all-transport-factory-bot.py
Normal file
144
examples/runner-examples/01-all-transport-factory-bot.py
Normal file
@@ -0,0 +1,144 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat Cloud-compatible bot example.
|
||||
|
||||
Transports are:
|
||||
|
||||
- Daily
|
||||
- SmallWebRTC
|
||||
- Twilio
|
||||
- Telnyx
|
||||
- Plivo
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
|
||||
from pipecat.transports.services.daily import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Define transport configurations using factory functions
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
"telnyx": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
"plivo": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
"""Main bot logic that works with any transport."""
|
||||
logger.info("Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
157
examples/runner-examples/02-two-transport-bot.py
Normal file
157
examples/runner-examples/02-two-transport-bot.py
Normal file
@@ -0,0 +1,157 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat Cloud-compatible bot example.
|
||||
|
||||
Transports are Daily or SmallWebRTC.
|
||||
|
||||
Run it with:
|
||||
|
||||
- WebRTC transport::
|
||||
|
||||
python 02-two-transport-bot.py
|
||||
|
||||
- Daily transport::
|
||||
|
||||
python 02-two-transport-bot.py --transport daily
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import DailyRunnerArguments, RunnerArguments, SmallWebRTCRunnerArguments
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
"""Main bot logic that works with any transport."""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
|
||||
transport = None
|
||||
|
||||
if isinstance(runner_args, DailyRunnerArguments):
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
if os.environ.get("ENV") != "local":
|
||||
from pipecat.audio.filters.krisp_filter import KrispFilter
|
||||
|
||||
krisp_filter = KrispFilter()
|
||||
else:
|
||||
krisp_filter = None
|
||||
|
||||
transport = DailyTransport(
|
||||
runner_args.room_url,
|
||||
runner_args.token,
|
||||
"Pipecat Bot",
|
||||
params=DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_filter=krisp_filter,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
else:
|
||||
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
|
||||
return
|
||||
|
||||
if transport is None:
|
||||
logger.error("Failed to create transport")
|
||||
return
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
117
examples/runner-examples/03-single-transport-bot.py
Normal file
117
examples/runner-examples/03-single-transport-bot.py
Normal file
@@ -0,0 +1,117 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat development runner example.
|
||||
|
||||
This example has a single transport—SmallWebRTCTransport.
|
||||
|
||||
Run it with::
|
||||
|
||||
python 03-single-transport-bot.py
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport):
|
||||
"""Main bot logic that works with any transport."""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
|
||||
await run_bot(transport)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
7
examples/runner-examples/Dockerfile
Normal file
7
examples/runner-examples/Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM dailyco/pipecat-base:latest
|
||||
|
||||
COPY ./requirements.txt requirements.txt
|
||||
|
||||
RUN pip install --no-cache-dir --upgrade -r requirements.txt
|
||||
|
||||
COPY ./02-two-transport-bot.py bot.py
|
||||
19
examples/runner-examples/build.sh
Executable file
19
examples/runner-examples/build.sh
Executable file
@@ -0,0 +1,19 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
VERSION="0.1"
|
||||
DOCKER_USERNAME="your_docker_username"
|
||||
AGENT_NAME="cloud-simple-bot"
|
||||
|
||||
# Build the Docker image with the correct context
|
||||
echo "Building Docker image..."
|
||||
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
|
||||
|
||||
# Push the Docker images
|
||||
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
|
||||
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
|
||||
|
||||
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
|
||||
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
|
||||
|
||||
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"
|
||||
3
examples/runner-examples/env.example
Normal file
3
examples/runner-examples/env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
DEEPGRAM_API_KEY=your_deepgram_api_key
|
||||
OPENAI_API_KEY=your_openai_api_key
|
||||
CARTESIA_API_KEY=your_cartesia_api_key
|
||||
8
examples/runner-examples/pcc-deploy.toml
Normal file
8
examples/runner-examples/pcc-deploy.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
agent_name = "cloud-simple-bot"
|
||||
image = "your_dockerhub_username/cloud-simple-bot:0.1"
|
||||
image_credentials = "dockerhub-access"
|
||||
secret_set = "cloud-simple-bot-secrets"
|
||||
enable_krisp = true
|
||||
|
||||
[scaling]
|
||||
min_agents = 0
|
||||
1
examples/runner-examples/requirements.txt
Normal file
1
examples/runner-examples/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
pipecat-ai[openai,daily,deepgram,cartesia,silero,webrtc,websocket,runner]
|
||||
@@ -85,6 +85,7 @@ playht = [ "pyht>=0.1.6", "websockets>=13.1,<15.0" ]
|
||||
qwen = []
|
||||
rime = [ "websockets>=13.1,<15.0" ]
|
||||
riva = [ "nvidia-riva-client~=2.21.1" ]
|
||||
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
|
||||
sambanova = []
|
||||
sentry = [ "sentry-sdk~=2.23.1" ]
|
||||
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch~=2.5.0", "torchaudio~=2.5.0" ]
|
||||
|
||||
1
src/pipecat/runner/__init__.py
Normal file
1
src/pipecat/runner/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Pipecat runner package for local and cloud bot execution."""
|
||||
112
src/pipecat/runner/daily.py
Normal file
112
src/pipecat/runner/daily.py
Normal file
@@ -0,0 +1,112 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Daily room and token configuration utilities.
|
||||
|
||||
This module provides helper functions for creating and configuring Daily rooms
|
||||
and authentication tokens. It handles both command-line argument parsing and
|
||||
environment variable configuration.
|
||||
|
||||
The module supports creating temporary rooms for development or using existing
|
||||
rooms specified via arguments or environment variables.
|
||||
|
||||
Required environment variables:
|
||||
|
||||
- DAILY_API_KEY - Daily API key for room/token creation
|
||||
- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use
|
||||
- DAILY_SAMPLE_ROOM_TOKEN (optional) - Existing token to use
|
||||
|
||||
Example::
|
||||
|
||||
import aiohttp
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
# Use room_url and token with DailyTransport
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
"""Configure Daily room URL and token from arguments or environment.
|
||||
|
||||
Args:
|
||||
aiohttp_session: HTTP session for making API requests.
|
||||
|
||||
Returns:
|
||||
Tuple containing the room URL and authentication token.
|
||||
|
||||
Raises:
|
||||
Exception: If room URL or API key are not provided.
|
||||
"""
|
||||
(url, token, _) = await configure_with_args(aiohttp_session)
|
||||
return (url, token)
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
|
||||
):
|
||||
"""Configure Daily room with command-line argument parsing.
|
||||
|
||||
Args:
|
||||
aiohttp_session: HTTP session for making API requests.
|
||||
parser: Optional argument parser. If None, creates a default one.
|
||||
|
||||
Returns:
|
||||
Tuple containing room URL, authentication token, and parsed arguments.
|
||||
|
||||
Raises:
|
||||
Exception: If room URL or API key are not provided via arguments or environment.
|
||||
"""
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 2 hours in
|
||||
# the future.
|
||||
expiry_time: float = 2 * 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token, args)
|
||||
148
src/pipecat/runner/livekit.py
Normal file
148
src/pipecat/runner/livekit.py
Normal file
@@ -0,0 +1,148 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""LiveKit room and token configuration utilities.
|
||||
|
||||
This module provides helper functions for creating and configuring LiveKit
|
||||
rooms and authentication tokens. It handles JWT token generation with
|
||||
appropriate grants for both regular participants and AI agents.
|
||||
|
||||
The module supports creating tokens for development and testing, with
|
||||
automatic agent detection for proper room permissions.
|
||||
|
||||
Required environment variables:
|
||||
|
||||
- LIVEKIT_API_KEY - LiveKit API key
|
||||
- LIVEKIT_API_SECRET - LiveKit API secret
|
||||
- LIVEKIT_URL - LiveKit server URL
|
||||
- LIVEKIT_ROOM_NAME - Room name to join
|
||||
|
||||
Example::
|
||||
|
||||
from pipecat.runner.livekit import configure
|
||||
|
||||
url, token, room_name = await configure()
|
||||
# Use with LiveKitTransport
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from livekit import api
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
|
||||
"""Generate a LiveKit access token for a participant.
|
||||
|
||||
Args:
|
||||
room_name: Name of the LiveKit room.
|
||||
participant_name: Name of the participant.
|
||||
api_key: LiveKit API key.
|
||||
api_secret: LiveKit API secret.
|
||||
|
||||
Returns:
|
||||
JWT token string for room access.
|
||||
"""
|
||||
token = api.AccessToken(api_key, api_secret)
|
||||
token.with_identity(participant_name).with_name(participant_name).with_grants(
|
||||
api.VideoGrants(
|
||||
room_join=True,
|
||||
room=room_name,
|
||||
)
|
||||
)
|
||||
|
||||
return token.to_jwt()
|
||||
|
||||
|
||||
def generate_token_with_agent(
|
||||
room_name: str, participant_name: str, api_key: str, api_secret: str
|
||||
) -> str:
|
||||
"""Generate a LiveKit access token for an agent participant.
|
||||
|
||||
Args:
|
||||
room_name: Name of the LiveKit room.
|
||||
participant_name: Name of the participant.
|
||||
api_key: LiveKit API key.
|
||||
api_secret: LiveKit API secret.
|
||||
|
||||
Returns:
|
||||
JWT token string for agent room access.
|
||||
"""
|
||||
token = api.AccessToken(api_key, api_secret)
|
||||
token.with_identity(participant_name).with_name(participant_name).with_grants(
|
||||
api.VideoGrants(
|
||||
room_join=True,
|
||||
room=room_name,
|
||||
agent=True, # This makes LiveKit client know agent has joined
|
||||
)
|
||||
)
|
||||
|
||||
return token.to_jwt()
|
||||
|
||||
|
||||
async def configure():
|
||||
"""Configure LiveKit room URL and token from arguments or environment.
|
||||
|
||||
Returns:
|
||||
Tuple containing the server URL, authentication token, and room name.
|
||||
|
||||
Raises:
|
||||
Exception: If required LiveKit configuration is not provided.
|
||||
"""
|
||||
(url, token, room_name, _) = await configure_with_args()
|
||||
return (url, token, room_name)
|
||||
|
||||
|
||||
async def configure_with_args(parser: Optional[argparse.ArgumentParser] = None):
|
||||
"""Configure LiveKit room with command-line argument parsing.
|
||||
|
||||
Args:
|
||||
parser: Optional argument parser. If None, creates a default one.
|
||||
|
||||
Returns:
|
||||
Tuple containing server URL, authentication token, room name, and parsed arguments.
|
||||
|
||||
Raises:
|
||||
Exception: If required LiveKit configuration is not provided via arguments or environment.
|
||||
"""
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
|
||||
)
|
||||
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
|
||||
url = args.url or os.getenv("LIVEKIT_URL")
|
||||
api_key = os.getenv("LIVEKIT_API_KEY")
|
||||
api_secret = os.getenv("LIVEKIT_API_SECRET")
|
||||
|
||||
if not room_name:
|
||||
raise Exception(
|
||||
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
|
||||
)
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
|
||||
)
|
||||
|
||||
if not api_key or not api_secret:
|
||||
raise Exception(
|
||||
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
|
||||
)
|
||||
|
||||
token = generate_token_with_agent(room_name, "Pipecat Agent", api_key, api_secret)
|
||||
|
||||
# Generate user token for testing/debugging
|
||||
user_token = generate_token(room_name, "User", api_key, api_secret)
|
||||
logger.info(f"User token: {user_token}")
|
||||
|
||||
return (url, token, room_name, args)
|
||||
462
src/pipecat/runner/run.py
Normal file
462
src/pipecat/runner/run.py
Normal file
@@ -0,0 +1,462 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Pipecat development runner.
|
||||
|
||||
This development runner executes Pipecat bots and provides the supporting
|
||||
infrastructure they need - creating Daily rooms and tokens, managing WebRTC
|
||||
connections, and setting up telephony webhook/WebSocket infrastructure. It
|
||||
supports multiple transport types with a unified interface.
|
||||
|
||||
Install with::
|
||||
|
||||
pip install pipecat-ai[runner]
|
||||
|
||||
All bots must implement a `bot(runner_args)` async function as the entry point.
|
||||
The server automatically discovers and executes this function when connections
|
||||
are established.
|
||||
|
||||
Single transport example::
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
transport = DailyTransport(
|
||||
runner_args.room_url,
|
||||
runner_args.token,
|
||||
"Bot",
|
||||
DailyParams(...)
|
||||
)
|
||||
# Your bot logic here
|
||||
await run_pipeline(transport)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
main()
|
||||
|
||||
Multiple transport example::
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
# Type-safe transport detection
|
||||
if isinstance(runner_args, DailyRunnerArguments):
|
||||
transport = setup_daily_transport(runner_args) # Your application code
|
||||
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
|
||||
transport = setup_webrtc_transport(runner_args) # Your application code
|
||||
elif isinstance(runner_args, WebSocketRunnerArguments):
|
||||
transport = setup_telephony_transport(runner_args) # Your application code
|
||||
|
||||
# Your bot implementation
|
||||
await run_pipeline(transport)
|
||||
|
||||
Supported transports:
|
||||
|
||||
- Daily - Creates rooms and tokens, runs bot as participant
|
||||
- WebRTC - Provides local WebRTC interface with prebuilt UI
|
||||
- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo
|
||||
|
||||
To run locally:
|
||||
|
||||
- WebRTC: `python bot.py -t webrtc`
|
||||
- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100`
|
||||
- Daily (server): `python bot.py -t daily`
|
||||
- Daily (direct, testing only): `python bot.py -d`
|
||||
- Telephony: `python bot.py -t twilio -x your_username.ngrok.io`
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
DailyRunnerArguments,
|
||||
SmallWebRTCRunnerArguments,
|
||||
WebSocketRunnerArguments,
|
||||
)
|
||||
|
||||
try:
|
||||
import uvicorn
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, FastAPI, WebSocket
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
except ImportError as e:
|
||||
logger.error(f"Runner dependencies not available: {e}")
|
||||
logger.error("To use Pipecat runners, install with: pip install pipecat-ai[runner]")
|
||||
raise ImportError(
|
||||
"Runner dependencies required. Install with: pip install pipecat-ai[runner]"
|
||||
) from e
|
||||
|
||||
|
||||
load_dotenv(override=True)
|
||||
os.environ["ENV"] = "local"
|
||||
|
||||
|
||||
def _get_bot_module():
|
||||
"""Get the bot module from the calling script."""
|
||||
import importlib.util
|
||||
|
||||
# Get the main module (the file that was executed)
|
||||
main_module = sys.modules["__main__"]
|
||||
|
||||
# Check if it has a bot function
|
||||
if hasattr(main_module, "bot"):
|
||||
return main_module
|
||||
|
||||
# Try to import 'bot' module from current directory
|
||||
try:
|
||||
import bot # type: ignore[import-untyped]
|
||||
|
||||
return bot
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Look for any .py file in current directory that has a bot function
|
||||
# (excluding server.py).
|
||||
cwd = os.getcwd()
|
||||
for filename in os.listdir(cwd):
|
||||
if filename.endswith(".py") and filename != "server.py":
|
||||
try:
|
||||
module_name = filename[:-3] # Remove .py extension
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name, os.path.join(cwd, filename)
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
if hasattr(module, "bot"):
|
||||
return module
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
raise ImportError(
|
||||
"Could not find 'bot' function. Make sure your bot file has a 'bot' function."
|
||||
)
|
||||
|
||||
|
||||
async def _run_telephony_bot(websocket: WebSocket):
|
||||
"""Run a bot for telephony transports."""
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
# Just pass the WebSocket - let the bot handle parsing
|
||||
runner_args = WebSocketRunnerArguments(websocket=websocket)
|
||||
|
||||
await bot_module.bot(runner_args)
|
||||
|
||||
|
||||
def _create_server_app(
|
||||
transport_type: str, host: str = "localhost", proxy: str = None, esp32_mode: bool = False
|
||||
):
|
||||
"""Create FastAPI app with transport-specific routes."""
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Set up transport-specific routes
|
||||
if transport_type == "webrtc":
|
||||
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
|
||||
elif transport_type == "daily":
|
||||
_setup_daily_routes(app)
|
||||
elif transport_type in ["twilio", "telnyx", "plivo"]:
|
||||
_setup_telephony_routes(app, transport_type, proxy)
|
||||
else:
|
||||
logger.warning(f"Unknown transport type: {transport_type}")
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "localhost"):
|
||||
"""Set up WebRTC-specific routes."""
|
||||
try:
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
except ImportError as e:
|
||||
logger.error(f"WebRTC transport dependencies not installed: {e}")
|
||||
return
|
||||
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
# Mount the frontend
|
||||
app.mount("/client", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@app.get("/", include_in_schema=False)
|
||||
async def root_redirect():
|
||||
"""Redirect root requests to client interface."""
|
||||
return RedirectResponse(url="/client/")
|
||||
|
||||
@app.post("/api/offer")
|
||||
async def offer(request: dict, background_tasks: BackgroundTasks):
|
||||
"""Handle WebRTC offer requests and manage peer connections."""
|
||||
pc_id = request.get("pc_id")
|
||||
|
||||
if pc_id and pc_id in pcs_map:
|
||||
pipecat_connection = pcs_map[pc_id]
|
||||
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
||||
await pipecat_connection.renegotiate(
|
||||
sdp=request["sdp"],
|
||||
type=request["type"],
|
||||
restart_pc=request.get("restart_pc", False),
|
||||
)
|
||||
else:
|
||||
pipecat_connection = SmallWebRTCConnection()
|
||||
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
|
||||
|
||||
@pipecat_connection.event_handler("closed")
|
||||
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
||||
"""Handle WebRTC connection closure and cleanup."""
|
||||
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
|
||||
pcs_map.pop(webrtc_connection.pc_id, None)
|
||||
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=pipecat_connection)
|
||||
background_tasks.add_task(bot_module.bot, runner_args)
|
||||
|
||||
answer = pipecat_connection.get_answer()
|
||||
|
||||
# Apply ESP32 SDP munging if enabled
|
||||
if esp32_mode and host != "localhost":
|
||||
from pipecat.runner.utils import smallwebrtc_sdp_munging
|
||||
|
||||
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], host)
|
||||
|
||||
pcs_map[answer["pc_id"]] = pipecat_connection
|
||||
return answer
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Manage FastAPI application lifecycle and cleanup connections."""
|
||||
yield
|
||||
coros = [pc.disconnect() for pc in pcs_map.values()]
|
||||
await asyncio.gather(*coros)
|
||||
pcs_map.clear()
|
||||
|
||||
app.router.lifespan_context = lifespan
|
||||
|
||||
|
||||
def _setup_daily_routes(app: FastAPI):
|
||||
"""Set up Daily-specific routes."""
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent():
|
||||
"""Launch a Daily bot and redirect to room."""
|
||||
print("Starting bot with Daily transport")
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
|
||||
# Start the bot in the background
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return RedirectResponse(room_url)
|
||||
|
||||
@app.post("/connect")
|
||||
async def rtvi_connect():
|
||||
"""Launch a Daily bot and return connection info for RTVI clients."""
|
||||
print("Starting bot with Daily transport")
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
|
||||
# Start the bot in the background
|
||||
bot_module = _get_bot_module()
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
|
||||
asyncio.create_task(bot_module.bot(runner_args))
|
||||
return {"room_url": room_url, "token": token}
|
||||
|
||||
|
||||
def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str):
|
||||
"""Set up telephony-specific routes."""
|
||||
# XML response templates
|
||||
XML_TEMPLATES = {
|
||||
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{proxy}/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>""",
|
||||
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://{proxy}/ws" bidirectionalMode="rtp"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>""",
|
||||
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{proxy}/ws</Stream>
|
||||
</Response>""",
|
||||
}
|
||||
|
||||
@app.post("/")
|
||||
async def start_call():
|
||||
"""Handle telephony webhook and return XML response."""
|
||||
logger.debug(f"POST {transport_type.upper()} XML")
|
||||
xml_content = XML_TEMPLATES.get(transport_type, "<Response></Response>")
|
||||
return HTMLResponse(content=xml_content, media_type="application/xml")
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
"""Handle WebSocket connections for telephony."""
|
||||
await websocket.accept()
|
||||
logger.debug("WebSocket connection accepted")
|
||||
await _run_telephony_bot(websocket)
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent():
|
||||
"""Simple status endpoint for telephony transports."""
|
||||
return {"status": f"Bot started with {transport_type}"}
|
||||
|
||||
|
||||
async def _run_daily_direct():
|
||||
"""Run Daily bot with direct connection (no FastAPI server)."""
|
||||
try:
|
||||
import aiohttp
|
||||
|
||||
from pipecat.runner.daily import configure
|
||||
except ImportError as e:
|
||||
logger.error("Daily transport dependencies not installed.")
|
||||
return
|
||||
|
||||
logger.info("Running with direct Daily connection...")
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
room_url, token = await configure(session)
|
||||
|
||||
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
|
||||
|
||||
# Get the bot module and run it directly
|
||||
bot_module = _get_bot_module()
|
||||
|
||||
print(f"📞 Joining Daily room: {room_url}")
|
||||
print(" (Direct connection - no web server needed)")
|
||||
print()
|
||||
|
||||
await bot_module.bot(runner_args)
|
||||
|
||||
|
||||
def main():
|
||||
"""Start the Pipecat development runner.
|
||||
|
||||
Parses command-line arguments and starts a FastAPI server configured
|
||||
for the specified transport type. The runner will discover and run
|
||||
any bot() function found in the current directory.
|
||||
|
||||
Command-line arguments:
|
||||
|
||||
Args:
|
||||
--host: Server host address (default: localhost)
|
||||
--port: Server port (default: 7860)
|
||||
-t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo)
|
||||
-x/--proxy: Public proxy hostname for telephony webhooks
|
||||
--esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
|
||||
-d/--direct: Connect directly to Daily room (automatically sets transport to daily)
|
||||
-v/--verbose: Increase logging verbosity
|
||||
|
||||
The bot file must contain a `bot(runner_args)` function as the entry point.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
|
||||
parser.add_argument("--host", type=str, default="localhost", help="Host address")
|
||||
parser.add_argument("--port", type=int, default=7860, help="Port number")
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--transport",
|
||||
type=str,
|
||||
choices=["daily", "webrtc", "twilio", "telnyx", "plivo"],
|
||||
default="webrtc",
|
||||
help="Transport type",
|
||||
)
|
||||
parser.add_argument("--proxy", "-x", help="Public proxy host name")
|
||||
parser.add_argument(
|
||||
"--esp32",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--direct",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Connect directly to Daily room (automatically sets transport to daily)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Auto-set transport to daily if --direct is used without explicit transport
|
||||
if args.direct and args.transport == "webrtc": # webrtc is the default
|
||||
args.transport = "daily"
|
||||
elif args.direct and args.transport != "daily":
|
||||
logger.error("--direct flag only works with Daily transport (-t daily)")
|
||||
return
|
||||
|
||||
# Validate ESP32 requirements
|
||||
if args.esp32 and args.host == "localhost":
|
||||
logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.")
|
||||
return
|
||||
|
||||
# Log level
|
||||
logger.remove()
|
||||
logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG")
|
||||
|
||||
# Handle direct Daily connection (no FastAPI server)
|
||||
if args.direct:
|
||||
print()
|
||||
print("🚀 Connecting directly to Daily room...")
|
||||
print()
|
||||
|
||||
# Run direct Daily connection
|
||||
asyncio.run(_run_daily_direct())
|
||||
return
|
||||
|
||||
# Print startup message for server-based transports
|
||||
if args.transport == "webrtc":
|
||||
print()
|
||||
if args.esp32:
|
||||
print(
|
||||
f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client (ESP32 mode)"
|
||||
)
|
||||
else:
|
||||
print(f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client")
|
||||
print(f" Open this URL in your browser to connect!")
|
||||
print()
|
||||
elif args.transport == "daily":
|
||||
print()
|
||||
print(f"🚀 Daily server starting at http://{args.host}:{args.port}")
|
||||
print(f" Open this URL in your browser to start a session!")
|
||||
print()
|
||||
|
||||
# Create the app with transport-specific setup
|
||||
app = _create_server_app(args.transport, args.host, args.proxy, args.esp32)
|
||||
|
||||
# Run the server
|
||||
uvicorn.run(app, host=args.host, port=args.port)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
60
src/pipecat/runner/types.py
Normal file
60
src/pipecat/runner/types.py
Normal file
@@ -0,0 +1,60 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Runner session argument types for the development runner.
|
||||
|
||||
These types are used by the development runner to pass transport-specific
|
||||
information to bot functions.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunnerArguments:
|
||||
"""Base class for runner session arguments."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class DailyRunnerArguments(RunnerArguments):
|
||||
"""Daily transport session arguments for the runner.
|
||||
|
||||
Parameters:
|
||||
room_url: Daily room URL to join
|
||||
token: Authentication token for the room
|
||||
body: Additional request data
|
||||
"""
|
||||
|
||||
room_url: str
|
||||
token: str
|
||||
body: Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebSocketRunnerArguments(RunnerArguments):
|
||||
"""WebSocket transport session arguments for the runner.
|
||||
|
||||
Parameters:
|
||||
websocket: WebSocket connection for audio streaming
|
||||
"""
|
||||
|
||||
websocket: WebSocket
|
||||
|
||||
|
||||
@dataclass
|
||||
class SmallWebRTCRunnerArguments(RunnerArguments):
|
||||
"""Small WebRTC transport session arguments for the runner.
|
||||
|
||||
Parameters:
|
||||
webrtc_connection: Pre-configured WebRTC peer connection
|
||||
"""
|
||||
|
||||
webrtc_connection: Any
|
||||
481
src/pipecat/runner/utils.py
Normal file
481
src/pipecat/runner/utils.py
Normal file
@@ -0,0 +1,481 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Transport utility functions and FastAPI route setup helpers.
|
||||
|
||||
This module provides common functionality for setting up transport-specific
|
||||
FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP
|
||||
manipulation utilities for WebRTC compatibility and transport detection helpers.
|
||||
|
||||
Key features:
|
||||
|
||||
- WebRTC route setup with connection management
|
||||
- WebSocket route setup for telephony providers
|
||||
- SDP munging for ESP32 and other WebRTC compatibility
|
||||
- Transport client ID detection across different transport types
|
||||
- Video capture utilities for Daily transports
|
||||
|
||||
The utilities are designed to be transport-agnostic where possible, with
|
||||
specific handlers for each transport type's unique requirements.
|
||||
|
||||
Example::
|
||||
|
||||
from pipecat.runner.utils import parse_telephony_websocket
|
||||
|
||||
async def telephony_websocket_handler(websocket: WebSocket):
|
||||
transport_type, call_data = await parse_telephony_websocket(websocket)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from fastapi import WebSocket
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.runner.types import (
|
||||
DailyRunnerArguments,
|
||||
SmallWebRTCRunnerArguments,
|
||||
WebSocketRunnerArguments,
|
||||
)
|
||||
from pipecat.transports.base_transport import BaseTransport
|
||||
|
||||
|
||||
def _detect_transport_type_from_message(message_data: dict) -> str:
|
||||
"""Attempt to auto-detect transport type from WebSocket message structure."""
|
||||
logger.trace("=== Auto-Detection Analysis ===")
|
||||
|
||||
# Twilio detection
|
||||
if (
|
||||
message_data.get("event") == "start"
|
||||
and "start" in message_data
|
||||
and "streamSid" in message_data.get("start", {})
|
||||
and "callSid" in message_data.get("start", {})
|
||||
):
|
||||
logger.trace("Auto-detected: TWILIO")
|
||||
return "twilio"
|
||||
|
||||
# Telnyx detection
|
||||
if (
|
||||
"stream_id" in message_data
|
||||
and "start" in message_data
|
||||
and "call_control_id" in message_data.get("start", {})
|
||||
):
|
||||
logger.trace("Auto-detected: TELNYX")
|
||||
return "telnyx"
|
||||
|
||||
# Plivo detection
|
||||
if (
|
||||
"start" in message_data
|
||||
and "streamId" in message_data.get("start", {})
|
||||
and "callId" in message_data.get("start", {})
|
||||
):
|
||||
logger.trace("Auto-detected: PLIVO")
|
||||
return "plivo"
|
||||
|
||||
logger.trace("Auto-detection failed - unknown format")
|
||||
return "unknown"
|
||||
|
||||
|
||||
async def parse_telephony_websocket(websocket: WebSocket):
|
||||
"""Parse telephony WebSocket messages and return transport type and call data.
|
||||
|
||||
Returns:
|
||||
tuple: (transport_type: str, call_data: dict)
|
||||
|
||||
call_data contains provider-specific fields:
|
||||
- Twilio: {"stream_id": str, "call_id": str}
|
||||
- Telnyx: {"stream_id": str, "call_control_id": str, "outbound_encoding": str}
|
||||
- Plivo: {"stream_id": str, "call_id": str}
|
||||
|
||||
Example usage::
|
||||
|
||||
transport_type, call_data = await parse_telephony_websocket(websocket)
|
||||
if transport_type == "telnyx":
|
||||
outbound_encoding = call_data["outbound_encoding"]
|
||||
"""
|
||||
# Read first two messages
|
||||
start_data = websocket.iter_text()
|
||||
|
||||
try:
|
||||
# First message
|
||||
first_message_raw = await start_data.__anext__()
|
||||
logger.trace(f"First message: {first_message_raw}")
|
||||
try:
|
||||
first_message = json.loads(first_message_raw)
|
||||
except json.JSONDecodeError:
|
||||
first_message = {}
|
||||
|
||||
# Second message
|
||||
second_message_raw = await start_data.__anext__()
|
||||
logger.trace(f"Second message: {second_message_raw}")
|
||||
try:
|
||||
second_message = json.loads(second_message_raw)
|
||||
except json.JSONDecodeError:
|
||||
second_message = {}
|
||||
|
||||
# Try auto-detection on both messages
|
||||
detected_type_first = _detect_transport_type_from_message(first_message)
|
||||
detected_type_second = _detect_transport_type_from_message(second_message)
|
||||
|
||||
# Use the successful detection
|
||||
if detected_type_first != "unknown":
|
||||
transport_type = detected_type_first
|
||||
call_data_raw = first_message
|
||||
logger.debug(f"Detected transport: {transport_type} (from first message)")
|
||||
elif detected_type_second != "unknown":
|
||||
transport_type = detected_type_second
|
||||
call_data_raw = second_message
|
||||
logger.debug(f"Detected transport: {transport_type} (from second message)")
|
||||
else:
|
||||
transport_type = "unknown"
|
||||
call_data_raw = second_message
|
||||
logger.warning("Could not auto-detect transport type")
|
||||
|
||||
# Extract provider-specific data
|
||||
if transport_type == "twilio":
|
||||
start_data = call_data_raw.get("start", {})
|
||||
call_data = {
|
||||
"stream_id": start_data.get("streamSid"),
|
||||
"call_id": start_data.get("callSid"),
|
||||
}
|
||||
|
||||
elif transport_type == "telnyx":
|
||||
call_data = {
|
||||
"stream_id": call_data_raw.get("stream_id"),
|
||||
"call_control_id": call_data_raw.get("start", {}).get("call_control_id"),
|
||||
"outbound_encoding": call_data_raw.get("start", {})
|
||||
.get("media_format", {})
|
||||
.get("encoding"),
|
||||
}
|
||||
|
||||
elif transport_type == "plivo":
|
||||
start_data = call_data_raw.get("start", {})
|
||||
call_data = {
|
||||
"stream_id": start_data.get("streamId"),
|
||||
"call_id": start_data.get("callId"),
|
||||
}
|
||||
|
||||
else:
|
||||
call_data = {}
|
||||
|
||||
logger.debug(f"Parsed - Type: {transport_type}, Data: {call_data}")
|
||||
return transport_type, call_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing telephony WebSocket: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def get_transport_client_id(transport: BaseTransport, client: Any) -> str:
|
||||
"""Get client identifier from transport-specific client object.
|
||||
|
||||
Args:
|
||||
transport: The transport instance.
|
||||
client: Transport-specific client object.
|
||||
|
||||
Returns:
|
||||
Client identifier string, empty if transport not supported.
|
||||
"""
|
||||
# Import conditionally to avoid dependency issues
|
||||
try:
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
if isinstance(transport, SmallWebRTCTransport):
|
||||
return client.pc_id
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from pipecat.transports.services.daily import DailyTransport
|
||||
|
||||
if isinstance(transport, DailyTransport):
|
||||
return client["id"]
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
logger.warning(f"Unable to get client id from unsupported transport {type(transport)}")
|
||||
return ""
|
||||
|
||||
|
||||
async def maybe_capture_participant_camera(
|
||||
transport: BaseTransport, client: Any, framerate: int = 0
|
||||
):
|
||||
"""Capture participant camera video if transport supports it.
|
||||
|
||||
Args:
|
||||
transport: The transport instance.
|
||||
client: Transport-specific client object.
|
||||
framerate: Video capture framerate. Defaults to 0 (auto).
|
||||
"""
|
||||
try:
|
||||
from pipecat.transports.services.daily import DailyTransport
|
||||
|
||||
if isinstance(transport, DailyTransport):
|
||||
await transport.capture_participant_video(
|
||||
client["id"], framerate=framerate, video_source="camera"
|
||||
)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
async def maybe_capture_participant_screen(
|
||||
transport: BaseTransport, client: Any, framerate: int = 0
|
||||
):
|
||||
"""Capture participant screen video if transport supports it.
|
||||
|
||||
Args:
|
||||
transport: The transport instance.
|
||||
client: Transport-specific client object.
|
||||
framerate: Video capture framerate. Defaults to 0 (auto).
|
||||
"""
|
||||
try:
|
||||
from pipecat.transports.services.daily import DailyTransport
|
||||
|
||||
if isinstance(transport, DailyTransport):
|
||||
await transport.capture_participant_video(
|
||||
client["id"], framerate=framerate, video_source="screenVideo"
|
||||
)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
|
||||
"""Clean up ICE candidates in SDP text for SmallWebRTC.
|
||||
|
||||
Args:
|
||||
text: SDP text to clean up.
|
||||
pattern: Pattern to match for candidate filtering.
|
||||
|
||||
Returns:
|
||||
Cleaned SDP text with filtered ICE candidates.
|
||||
"""
|
||||
result = []
|
||||
lines = text.splitlines()
|
||||
for line in lines:
|
||||
if re.search("a=candidate", line):
|
||||
if re.search(pattern, line) and not re.search("raddr", line):
|
||||
result.append(line)
|
||||
else:
|
||||
result.append(line)
|
||||
return "\r\n".join(result)
|
||||
|
||||
|
||||
def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
|
||||
"""Remove unsupported fingerprint algorithms from SDP text.
|
||||
|
||||
Args:
|
||||
text: SDP text to clean up.
|
||||
|
||||
Returns:
|
||||
SDP text with sha-384 and sha-512 fingerprints removed.
|
||||
"""
|
||||
result = []
|
||||
lines = text.splitlines()
|
||||
for line in lines:
|
||||
if not re.search("sha-384", line) and not re.search("sha-512", line):
|
||||
result.append(line)
|
||||
return "\r\n".join(result)
|
||||
|
||||
|
||||
def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
|
||||
"""Apply SDP modifications for SmallWebRTC compatibility.
|
||||
|
||||
Args:
|
||||
sdp: Original SDP string.
|
||||
host: Host address for ICE candidate filtering.
|
||||
|
||||
Returns:
|
||||
Modified SDP string with fingerprint and ICE candidate cleanup.
|
||||
"""
|
||||
sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp)
|
||||
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
|
||||
return sdp
|
||||
|
||||
|
||||
def _get_transport_params(transport_key: str, transport_params: Dict[str, Callable]) -> Any:
|
||||
"""Get transport parameters from factory function.
|
||||
|
||||
Args:
|
||||
transport_key: The transport key to look up
|
||||
transport_params: Dict mapping transport names to parameter factory functions
|
||||
|
||||
Returns:
|
||||
Transport parameters from the factory function
|
||||
|
||||
Raises:
|
||||
ValueError: If transport key is missing from transport_params
|
||||
"""
|
||||
if transport_key not in transport_params:
|
||||
raise ValueError(
|
||||
f"Missing transport params for '{transport_key}'. "
|
||||
f"Please add '{transport_key}' key to your transport_params dict."
|
||||
)
|
||||
|
||||
params = transport_params[transport_key]()
|
||||
logger.debug(f"Using transport params for {transport_key}")
|
||||
return params
|
||||
|
||||
|
||||
async def _create_telephony_transport(
|
||||
websocket: WebSocket,
|
||||
params: Optional[Any] = None,
|
||||
transport_type: str = None,
|
||||
call_data: dict = None,
|
||||
) -> BaseTransport:
|
||||
"""Create a telephony transport with pre-parsed WebSocket data.
|
||||
|
||||
Args:
|
||||
websocket: FastAPI WebSocket connection from telephony provider
|
||||
params: FastAPIWebsocketParams (required)
|
||||
transport_type: Pre-detected provider type ("twilio", "telnyx", "plivo")
|
||||
call_data: Pre-parsed call data dict with provider-specific fields
|
||||
|
||||
Returns:
|
||||
Configured FastAPIWebsocketTransport ready for telephony use.
|
||||
"""
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport
|
||||
|
||||
if params is None:
|
||||
raise ValueError(
|
||||
"FastAPIWebsocketParams must be provided. "
|
||||
"The serializer and add_wav_header will be set automatically."
|
||||
)
|
||||
|
||||
# Always set add_wav_header to False for telephony
|
||||
params.add_wav_header = False
|
||||
|
||||
logger.info(f"Using pre-detected telephony provider: {transport_type}")
|
||||
|
||||
if transport_type == "twilio":
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
|
||||
params.serializer = TwilioFrameSerializer(
|
||||
stream_sid=call_data["stream_id"],
|
||||
call_sid=call_data["call_id"],
|
||||
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
|
||||
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
|
||||
)
|
||||
elif transport_type == "telnyx":
|
||||
from pipecat.serializers.telnyx import TelnyxFrameSerializer
|
||||
|
||||
params.serializer = TelnyxFrameSerializer(
|
||||
stream_id=call_data["stream_id"],
|
||||
call_control_id=call_data["call_control_id"],
|
||||
outbound_encoding=call_data["outbound_encoding"],
|
||||
inbound_encoding="PCMU", # Standard default
|
||||
api_key=os.getenv("TELNYX_API_KEY", ""),
|
||||
)
|
||||
elif transport_type == "plivo":
|
||||
from pipecat.serializers.plivo import PlivoFrameSerializer
|
||||
|
||||
params.serializer = PlivoFrameSerializer(
|
||||
stream_id=call_data["stream_id"],
|
||||
call_id=call_data["call_id"],
|
||||
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
|
||||
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unsupported telephony provider: {transport_type}. "
|
||||
f"Supported providers: twilio, telnyx, plivo"
|
||||
)
|
||||
|
||||
return FastAPIWebsocketTransport(websocket=websocket, params=params)
|
||||
|
||||
|
||||
async def create_transport(
|
||||
runner_args: Any, transport_params: Dict[str, Callable]
|
||||
) -> BaseTransport:
|
||||
"""Create a transport from runner arguments using factory functions.
|
||||
|
||||
This function uses the clean transport_params factory pattern where users
|
||||
define a dictionary mapping transport names to parameter factory functions.
|
||||
|
||||
Args:
|
||||
runner_args: Arguments from the runner.
|
||||
transport_params: Dict mapping transport names to parameter factory functions.
|
||||
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo"
|
||||
Values should be functions that return transport parameters when called.
|
||||
|
||||
Returns:
|
||||
Configured transport instance.
|
||||
|
||||
Raises:
|
||||
ValueError: If transport key is missing from transport_params or runner_args type is unsupported.
|
||||
ImportError: If required dependencies are not installed.
|
||||
|
||||
Example::
|
||||
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
"telnyx": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
"plivo": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
}
|
||||
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
"""
|
||||
# Create transport based on runner args type
|
||||
if isinstance(runner_args, DailyRunnerArguments):
|
||||
params = _get_transport_params("daily", transport_params)
|
||||
|
||||
from pipecat.transports.services.daily import DailyTransport
|
||||
|
||||
return DailyTransport(
|
||||
runner_args.room_url,
|
||||
runner_args.token,
|
||||
"Pipecat Bot",
|
||||
params=params,
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
|
||||
params = _get_transport_params("webrtc", transport_params)
|
||||
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
return SmallWebRTCTransport(
|
||||
params=params,
|
||||
webrtc_connection=runner_args.webrtc_connection,
|
||||
)
|
||||
|
||||
elif isinstance(runner_args, WebSocketRunnerArguments):
|
||||
# Parse once to determine the provider and get data
|
||||
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
|
||||
params = _get_transport_params(transport_type, transport_params)
|
||||
|
||||
# Create telephony transport with pre-parsed data
|
||||
return await _create_telephony_transport(
|
||||
runner_args.websocket, params, transport_type, call_data
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")
|
||||
Reference in New Issue
Block a user