Compare commits

...

7 Commits

Author SHA1 Message Date
Chad Bailey
974e989cb5 fixup 2025-04-03 17:18:26 +00:00
Chad Bailey
8f33f00c3f wip - pcc-transport example 2025-04-01 18:25:09 +00:00
Chad Bailey
0fede6bb48 wip - pcc-transport example 2025-04-01 18:24:46 +00:00
Chad Bailey
943c75c622 basic pipecat cloud transport working 2025-04-01 15:18:21 +00:00
Chad Bailey
46b2e925f2 dynamic PipecatCloudParams is not working 2025-04-01 15:05:59 +00:00
Chad Bailey
e6e8a03c5f added event decorators 2025-03-31 21:41:53 +00:00
Chad Bailey
e5ee3e3720 dynamic params 2025-03-31 20:34:50 +00:00
58 changed files with 7188 additions and 5 deletions

View File

@@ -15,12 +15,16 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.gemini_multimodal_live import GeminiMultimodalLiveLLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.pipecat_cloud import (
PipecatCloudParams,
PipecatCloudTransport,
SessionArguments,
)
load_dotenv(override=True)
logger.remove(0)
# logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
@@ -36,9 +40,11 @@ Respond to what the user said in a creative and helpful way. Keep your responses
async def run_bot(webrtc_connection):
pipecat_transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
pipecat_transport = PipecatCloudTransport(
session_args=SessionArguments(
webrtc_connection=webrtc_connection,
),
params=PipecatCloudParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,

View File

@@ -0,0 +1,157 @@
# Simple Chatbot for Pipecat Cloud
This project demonstrates how to build a complete Pipecat AI agent application with both client and server components. It includes a Next.js client for interacting with a Pipecat AI bot server through Daily.co's WebRTC transport.
<img src="image.png" width="420px">
## Project Overview
- **Server**: Python-based Pipecat bot with video/audio processing capabilities
- **Client**: Next.js TypeScript web application using the Pipecat React & JS SDKs
- **Infrastructure**: Deployable to Pipecat Cloud (server) and Vercel (client)
> See the [simple-chatbot example](https://github.com/pipecat-ai/pipecat/tree/main/examples/simple-chatbot) with different client and server implementations.
## Quick Start
### 1. Server Setup
Navigate to the server directory:
```bash
cd server
```
Create and activate a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
Install requirements:
```bash
pip install -r requirements.txt
```
Copy env.example to .env and add your API keys:
```bash
cp env.example .env
# Edit .env to add OPENAI_API_KEY and CARTESIA_API_KEY
```
Run the server locally to test before deploying:
```bash
LOCAL_RUN=1 python bot.py
```
This will open a browser window with a Daily.co room where you can test your bot directly.
### 2. Client Setup
In a separate terminal, navigate to the client directory:
```bash
cd client-react
```
Install dependencies:
```bash
npm install
```
Create `.env.local` file with your Pipecat Cloud API key:
```bash
cp env.local.example .env.local
```
> Create a Pipecat Cloud API key using the dashboard
Start the development server:
```bash
npm run dev
```
Open [http://localhost:3000](http://localhost:3000) to interact with your agent through the Next.js client.
## Deployment
> See the [Pipecat Cloud Quickstart](https://docs.pipecat.daily.co/quickstart) for a complete walkthrough.
### Deploy Server to Pipecat Cloud
1. Install the Pipecat Cloud CLI:
```bash
pip install pipecatcloud
```
2. Authenticate:
```bash
pcc auth login
```
3. Build and push your Docker image:
```bash
cd server
chmod +x build.sh
./build.sh
```
> IMPORTANT: Before running this build script, you need to add your DOCKER_USERNAME
4. Create a secret set for your API keys:
```bash
pcc secrets set simple-chatbot-secrets --file .env
```
5. Deploy to Pipecat Cloud:
```bash
pcc deploy
```
> IMPORTANT: Before deploying, you need to add your Docker Hub username
### Deploy Client to Vercel
1. Push your Next.js client to GitHub
2. Connect your GitHub repository to Vercel
3. Add your `PIPECAT_CLOUD_API_KEY` environment variable in Vercel
4. Deploy with the Vercel dashboard or CLI
## Project Structure
```
simple-chatbot/
├── client-next/ # Next.js client application
│ ├── src/
│ │ ├── app/ # Next.js app routes
│ │ │ └── api/
│ │ │ └── connect/ # API endpoint for Daily.co connection
│ │ ├── components/ # React components
│ │ └── providers/ # React providers including RTVIProvider
│ ├── package.json
│ └── README.md # Client-specific documentation
└── server/ # Pipecat bot server
├── assets/ # Robot animation frames
├── bot.py # The Pipecat pipeline implementation
├── Dockerfile # For building the container image
├── build.sh # Script for building and pushing Docker image
├── requirements.txt # Python dependencies
├── pcc-deploy.toml # Pipecat Cloud deployment config
└── README.md # Server-specific documentation
```

View File

@@ -0,0 +1,41 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
# dependencies
/node_modules
/.pnp
.pnp.*
.yarn/*
!.yarn/patches
!.yarn/plugins
!.yarn/releases
!.yarn/versions
# testing
/coverage
# next.js
/.next/
/out/
# production
/build
# misc
.DS_Store
*.pem
# debug
npm-debug.log*
yarn-debug.log*
yarn-error.log*
.pnpm-debug.log*
# env files (can opt-in for committing if needed)
.env*
# vercel
.vercel
# typescript
*.tsbuildinfo
next-env.d.ts

View File

@@ -0,0 +1,33 @@
# Simple Chatbot Client
A Next.js application using TypeScript and the Pipecat React SDK to connect to a Pipecat AI agent.
## Features
- Next.js App Router architecture
- TypeScript for type safety
- RTVI client integration for real-time voice and video
- Daily.co WebRTC transport
- Custom API endpoint for Daily room creation
## Getting Started
1. Install dependencies:
```bash
npm install
```
2. Create `.env.local` file with your Pipecat Cloud API key:
```
PIPECAT_CLOUD_API_KEY=your_pipecat_cloud_key
```
3. Start the development server:
```bash
npm run dev
```
4. Open [http://localhost:3000](http://localhost:3000) in your browser

View File

@@ -0,0 +1 @@
PIPECAT_CLOUD_API_KEY=your_api_key_here

View File

@@ -0,0 +1,16 @@
import { dirname } from "path";
import { fileURLToPath } from "url";
import { FlatCompat } from "@eslint/eslintrc";
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const compat = new FlatCompat({
baseDirectory: __dirname,
});
const eslintConfig = [
...compat.extends("next/core-web-vitals", "next/typescript"),
];
export default eslintConfig;

View File

@@ -0,0 +1,7 @@
import type { NextConfig } from "next";
const nextConfig: NextConfig = {
/* config options here */
};
export default nextConfig;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,28 @@
{
"name": "my-nextjs-app",
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint"
},
"dependencies": {
"@pipecat-ai/client-js": "^0.3.5",
"@pipecat-ai/client-react": "^0.3.5",
"@pipecat-ai/daily-transport": "^0.3.7",
"next": "15.2.3",
"react": "^19.0.0",
"react-dom": "^19.0.0"
},
"devDependencies": {
"@eslint/eslintrc": "^3",
"@types/node": "^20",
"@types/react": "^19",
"@types/react-dom": "^19",
"eslint": "^9",
"eslint-config-next": "15.2.3",
"typescript": "^5"
}
}

View File

@@ -0,0 +1,44 @@
import { NextResponse, NextRequest } from 'next/server';
export async function POST(request: NextRequest) {
try {
const { MY_CUSTOM_DATA } = await request.json();
const response = await fetch(
'https://api.pipecat.daily.co/v1/public/simple-chatbot/start',
{
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.PIPECAT_CLOUD_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
// Create Daily room
createDailyRoom: true,
// Optionally set Daily room properties
dailyRoomProperties: { start_video_off: true },
// Optionally pass custom data to the bot
body: { MY_CUSTOM_DATA },
}),
}
);
if (!response.ok) {
throw new Error(`API responded with status: ${response.status}`);
}
const data = await response.json();
// Transform the response to match what RTVI client expects
return NextResponse.json({
room_url: data.dailyRoom,
token: data.dailyToken,
});
} catch (error) {
console.error('API error:', error);
return NextResponse.json(
{ error: 'Failed to start agent' },
{ status: 500 }
);
}
}

View File

@@ -0,0 +1,82 @@
body {
margin: 0;
padding: 20px;
font-family: Arial, sans-serif;
background-color: #f0f0f0;
}
.app {
max-width: 1200px;
margin: 0 auto;
}
.status-bar {
display: flex;
justify-content: space-between;
align-items: center;
padding: 10px;
background-color: #fff;
border-radius: 8px;
margin-bottom: 20px;
}
.controls button {
padding: 8px 16px;
margin-left: 10px;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.connect-btn {
background-color: #4caf50;
color: white;
}
.disconnect-btn {
background-color: #f44336;
color: white;
}
.main-content {
background-color: #fff;
border-radius: 8px;
padding: 20px;
margin-bottom: 20px;
}
.bot-container {
display: flex;
flex-direction: column;
align-items: center;
}
.video-container {
width: 640px;
height: 360px;
background-color: #ddd;
margin-bottom: 20px;
border-radius: 8px;
overflow: hidden;
}
.video-container video {
width: 100%;
height: 100%;
object-fit: cover;
}
.mic-enabled {
background-color: #4caf50;
color: white;
}
.mic-disabled {
background-color: #f44336;
color: white;
}

View File

@@ -0,0 +1,21 @@
import './globals.css';
import { RTVIProvider } from '@/providers/RTVIProvider';
export const metadata = {
title: 'Pipecat React Client',
description: 'Pipecat RTVI Client using Next.js',
};
export default function RootLayout({
children,
}: {
children: React.ReactNode;
}) {
return (
<html lang="en">
<body>
<RTVIProvider>{children}</RTVIProvider>
</body>
</html>
);
}

View File

@@ -0,0 +1,41 @@
'use client';
import {
RTVIClientAudio,
RTVIClientVideo,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
import { ConnectButton } from '../components/ConnectButton';
import { StatusDisplay } from '../components/StatusDisplay';
import { DebugDisplay } from '../components/DebugDisplay';
function BotVideo() {
const transportState = useRTVIClientTransportState();
const isConnected = transportState !== 'disconnected';
return (
<div className="bot-container">
<div className="video-container">
{isConnected && <RTVIClientVideo participant="bot" fit="cover" />}
</div>
</div>
);
}
export default function Home() {
return (
<div className="app">
<div className="status-bar">
<StatusDisplay />
<ConnectButton />
</div>
<div className="main-content">
<BotVideo />
</div>
<DebugDisplay />
<RTVIClientAudio />
</div>
);
}

View File

@@ -0,0 +1,40 @@
import {
useRTVIClient,
useRTVIClientTransportState,
} from '@pipecat-ai/client-react';
export function ConnectButton() {
const client = useRTVIClient();
const transportState = useRTVIClientTransportState();
const isConnected = ['connected', 'ready'].includes(transportState);
const handleClick = async () => {
if (!client) {
console.error('RTVI client is not initialized');
return;
}
try {
if (isConnected) {
await client.disconnect();
} else {
await client.connect();
}
} catch (error) {
console.error('Connection error:', error);
}
};
return (
<div className="controls">
<button
className={isConnected ? 'disconnect-btn' : 'connect-btn'}
onClick={handleClick}
disabled={
!client || ['connecting', 'disconnecting'].includes(transportState)
}>
{isConnected ? 'Disconnect' : 'Connect'}
</button>
</div>
);
}

View File

@@ -0,0 +1,26 @@
.debug-panel {
background-color: #fff;
border-radius: 8px;
padding: 20px;
}
.debug-panel h3 {
margin: 0 0 10px 0;
font-size: 16px;
font-weight: bold;
}
.debug-log {
height: 200px;
overflow-y: auto;
background-color: #f8f8f8;
padding: 10px;
border-radius: 4px;
font-family: monospace;
font-size: 12px;
line-height: 1.4;
}
.debug-log div {
margin-bottom: 4px;
}

View File

@@ -0,0 +1,144 @@
import { useRef, useCallback } from 'react';
import {
Participant,
RTVIEvent,
TransportState,
TranscriptData,
BotLLMTextData,
} from '@pipecat-ai/client-js';
import { useRTVIClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import './DebugDisplay.css';
export function DebugDisplay() {
const debugLogRef = useRef<HTMLDivElement>(null);
const client = useRTVIClient();
const log = useCallback((message: string) => {
if (!debugLogRef.current) return;
const entry = document.createElement('div');
entry.textContent = `${new Date().toISOString()} - ${message}`;
// Add styling based on message type
if (message.startsWith('User: ')) {
entry.style.color = '#2196F3'; // blue for user
} else if (message.startsWith('Bot: ')) {
entry.style.color = '#4CAF50'; // green for bot
}
debugLogRef.current.appendChild(entry);
debugLogRef.current.scrollTop = debugLogRef.current.scrollHeight;
}, []);
// Log transport state changes
useRTVIClientEvent(
RTVIEvent.TransportStateChanged,
useCallback(
(state: TransportState) => {
log(`Transport state changed: ${state}`);
},
[log]
)
);
// Log bot connection events
useRTVIClientEvent(
RTVIEvent.BotConnected,
useCallback(
(participant?: Participant) => {
log(`Bot connected: ${JSON.stringify(participant)}`);
},
[log]
)
);
useRTVIClientEvent(
RTVIEvent.BotDisconnected,
useCallback(
(participant?: Participant) => {
log(`Bot disconnected: ${JSON.stringify(participant)}`);
},
[log]
)
);
// Log track events
useRTVIClientEvent(
RTVIEvent.TrackStarted,
useCallback(
(track: MediaStreamTrack, participant?: Participant) => {
log(
`Track started: ${track.kind} from ${participant?.name || 'unknown'}`
);
},
[log]
)
);
useRTVIClientEvent(
RTVIEvent.TrackStopped,
useCallback(
(track: MediaStreamTrack, participant?: Participant) => {
log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
);
},
[log]
)
);
// Log bot ready state and check tracks
useRTVIClientEvent(
RTVIEvent.BotReady,
useCallback(() => {
log(`Bot ready`);
if (!client) return;
const tracks = client.tracks();
log(
`Available tracks: ${JSON.stringify({
local: {
audio: !!tracks.local.audio,
video: !!tracks.local.video,
},
bot: {
audio: !!tracks.bot?.audio,
video: !!tracks.bot?.video,
},
})}`
);
}, [client, log])
);
// Log transcripts
useRTVIClientEvent(
RTVIEvent.UserTranscript,
useCallback(
(data: TranscriptData) => {
// Only log final transcripts
if (data.final) {
log(`User: ${data.text}`);
}
},
[log]
)
);
useRTVIClientEvent(
RTVIEvent.BotTranscript,
useCallback(
(data: BotLLMTextData) => {
log(`Bot: ${data.text}`);
},
[log]
)
);
return (
<div className="debug-panel">
<h3>Debug Info</h3>
<div ref={debugLogRef} className="debug-log" />
</div>
);
}

View File

@@ -0,0 +1,11 @@
import { useRTVIClientTransportState } from '@pipecat-ai/client-react';
export function StatusDisplay() {
const transportState = useRTVIClientTransportState();
return (
<div className="status">
Status: <span>{transportState}</span>
</div>
);
}

View File

@@ -0,0 +1,38 @@
'use client';
import { RTVIClient } from '@pipecat-ai/client-js';
import { DailyTransport } from '@pipecat-ai/daily-transport';
import { RTVIClientProvider } from '@pipecat-ai/client-react';
import { PropsWithChildren, useEffect, useState } from 'react';
const MY_CUSTOM_DATA = { foo: 'bar' };
export function RTVIProvider({ children }: PropsWithChildren) {
const [client, setClient] = useState<RTVIClient | null>(null);
useEffect(() => {
console.log('Setting up Transport and Client');
const transport = new DailyTransport();
const rtviClient = new RTVIClient({
transport,
params: {
baseUrl: '/api',
endpoints: {
connect: '/connect',
},
requestData: { MY_CUSTOM_DATA },
},
enableMic: true,
enableCam: false,
});
setClient(rtviClient);
}, []);
if (!client) {
return null;
}
return <RTVIClientProvider client={client}>{children}</RTVIClientProvider>;
}

View File

@@ -0,0 +1,28 @@
{
"compilerOptions": {
"target": "ES2017",
"lib": ["dom", "dom.iterable", "esnext"],
"allowJs": true,
"skipLibCheck": true,
"strict": true,
"noEmit": true,
"esModuleInterop": true,
"module": "esnext",
"moduleResolution": "bundler",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",
"incremental": true,
"plugins": [
{
"name": "next"
}
],
"paths": {
"@/components/*": ["./src/components/*"],
"@/providers/*": ["./src/providers/*"]
}
},
"include": ["next-env.d.ts", "**/*.ts", "**/*.tsx", ".next/types/**/*.ts"],
"exclude": ["node_modules"]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 733 KiB

View File

@@ -0,0 +1,51 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
dist/
*.egg-info/
.installed.cfg
*.egg
.pytest_cache/
.coverage
.coverage.*
.env
.venv
env/
venv/
ENV/
.mypy_cache/
.dmypy.json
dmypy.json
# JavaScript/Node.js
node_modules/
dist/
dist-ssr/
*.local
.env.local
.env.development.local
.env.test.local
.env.production.local
# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
# Editor/IDE
.vscode/*
!.vscode/extensions.json
.idea/
*.swp
*.swo
.DS_Store
# Project specific
runpod.toml

View File

@@ -0,0 +1,12 @@
FROM dailyco/pipecat-base:latest
RUN apt-get update && apt-get install ffmpeg -y
COPY ./pipecat pipecat
COPY ./requirements.txt requirements.txt
COPY ./assets assets
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./bot.py bot.py

View File

@@ -0,0 +1,33 @@
# Simple Chatbot Server
A Pipecat bot.py file that is built to be deployed to Pipecat Cloud.
## Environment Variables
Copy `env.example` to `.env` and configure:
```ini
OPENAI_API_KEY= # Your OpenAI API key (required for OpenAI bot)
CARTESIA_API_KEY= # Your Cartesia API key
```
## Running the server locally
Set up and activate your virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
Install dependencies:
```bash
pip install -r requirements.txt
```
Run the server:
```bash
LOCAL_RUN=1 python bot.py
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 759 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 884 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 876 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 874 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 882 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 885 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 888 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 890 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 898 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 836 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 905 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 849 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 864 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 858 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 875 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

View File

@@ -0,0 +1,330 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Bot Implementation.
This module implements a chatbot using OpenAI's GPT-4 model for natural language
processing. It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Text-to-speech using ElevenLabs
- Support for both English and Spanish
The bot runs as part of a pipeline that processes audio/video frames and manages
the conversation flow.
"""
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecatcloud.agent import (
DailySessionArguments,
)
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.gladia.stt import GladiaSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyTransport
from pipecat.transports.services.pipecat_cloud import (
PipecatCloudParams,
PipecatCloudSessionArguments,
PipecatCloudTransport,
)
load_dotenv(override=True)
logger.add(sys.stderr, level="DEBUG")
print(f"DailyTransport: {DailyTransport}")
# Check if we're in local development mode
LOCAL_RUN = os.getenv("LOCAL_RUN")
if LOCAL_RUN:
import asyncio
import webbrowser
try:
from local_runner import configure
except ImportError:
logger.error("Could not import local_runner module. Local development mode may not work.")
# Logger for local dev
logger.add(sys.stderr, level="DEBUG")
sprites = []
script_dir = os.path.dirname(__file__)
# Load sequential animation frames
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
# Create a smooth animation by adding reversed frames
flipped = sprites[::-1]
sprites.extend(flipped)
# Define static and animated states
quiet_frame = sprites[0] # Static frame for when bot is listening
talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking
class TalkingAnimation(FrameProcessor):
"""Manages the bot's visual animation states.
Switches between static (listening) and animated (talking) states based on
the bot's current speaking status.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and update animation state.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame, direction)
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
"""Fetch weather data dummy function.
This function simulates fetching weather data from an external API.
It demonstrates how to call an external service from the language model.
"""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
await result_callback({"conditions": "nice", "temperature": "75"})
async def main(session_args: PipecatCloudSessionArguments):
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport
- Speech-to-text and text-to-speech services
- Language model integration
- Animation processing
- RTVI event handling
"""
logger.info(f"session args: {session_args}")
# Set up Daily transport with video/audio parameters
transport = PipecatCloudTransport(
session_args=session_args,
params=PipecatCloudParams(
audio_out_enabled=True, # Enable output audio for the bot
camera_out_enabled=True, # Enable the camera output for the bot
camera_out_width=1024, # Set the camera output width
camera_out_height=576, # Set the camera output height
transcription_enabled=True, # Enable transcription for the user
vad_enabled=True, # Enable VAD to handle user speech
vad_analyzer=SileroVADAnalyzer(), # Use the Silero VAD analyzer
vad_audio_passthrough=True, # Pass audio through VAD for user speech to the rest of the pipeline
),
)
# Initialize text-to-speech service
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman
)
stt = GladiaSTTService(api_key=os.getenv("GLADIA_API_KEY"))
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register your function call providing the function name and callback
llm.register_function("get_current_weather", fetch_weather_from_api)
# Define your function call using the FunctionSchema
# Learn more about function calling in Pipecat:
# https://docs.pipecat.ai/guides/features/function-calling
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
# Set up the tools schema with your weather function call
tools = ToolsSchema(standard_tools=[weather_function])
# Set up initial messages for the bot
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
# Pass your initial messages and tools to the context to initialize the context
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
ta = TalkingAnimation()
# RTVI events for Pipecat client UI
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
# Add your processors to the pipeline
pipeline = Pipeline(
[
transport.input(),
stt,
rtvi,
context_aggregator.user(),
llm,
tts,
ta,
transport.output(),
context_aggregator.assistant(),
]
)
# Create a PipelineTask to manage the pipeline
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
# Notify the client that the bot is ready
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
# Push a static frame to show the bot is listening
await task.queue_frame(quiet_frame)
# Capture the first participant's transcription
# await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation by pushing a context frame to the pipeline
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, participant):
logger.debug(f"Participant left: {participant}")
# Cancel the PipelineTask to stop processing
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
async def bot(args: DailySessionArguments):
"""Main bot entry point compatible with Pipecat Cloud.
Args:
room_url: The Daily room URL
token: The Daily room token
body: The configuration object from the request body
session_id: The session ID for logging
"""
pcc_args = PipecatCloudSessionArguments(
room_url=args.room.url,
token=args.token,
body=args.body,
session_id=args.session_id,
)
logger.info(f"Bot process initialized {pcc_args.room_url} {pcc_args.token}")
try:
await main(pcc_args)
logger.info("Bot process completed")
except Exception as e:
logger.exception(f"Error in bot process: {str(e)}")
raise
# Local development
async def local_daily():
# TODO-CB: This becomes SmallWebRTCTransport
"""Function for local development testing."""
try:
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
args = PipecatCloudSessionArguments(
room_url=room_url, token=token, body={}, session_id=None
)
logger.warning("_")
logger.warning("_")
logger.warning(f"Talk to your voice agent here: {room_url}")
logger.warning("_")
logger.warning("_")
webbrowser.open(room_url)
await main(args)
except Exception as e:
logger.exception(f"Error in local development mode: {e}")
async def local_webrtc(webrtc_connection):
await main(PipecatCloudSessionArguments(webrtc_connection=webrtc_connection))
# Local development entry point
if LOCAL_RUN and __name__ == "__main__":
try:
asyncio.run(local_daily())
except Exception as e:
logger.exception(f"Failed to run in local mode: {e}")

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.2"
DOCKER_USERNAME="chadbailey59"
AGENT_NAME="pcc-transport-chatbot"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY=sk-PL...
CARTESIA_API_KEY=aeb...

View File

@@ -0,0 +1,100 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebRTC Voice Agent</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; margin-top: 50px; }
#status { font-size: 20px; margin: 20px; }
button { padding: 10px 20px; font-size: 16px; }
</style>
</head>
<body>
<h1>WebRTC Voice Agent</h1>
<p id="status">Disconnected</p>
<button id="connect-btn">Connect</button>
<audio id="audio-el" autoplay></audio>
<script>
const statusEl = document.getElementById("status")
const buttonEl = document.getElementById("connect-btn")
const audioEl = document.getElementById("audio-el")
let connected = false
let peerConnection = null
/*const waitForIceGatheringComplete = async (pc) => {
if (pc.iceGatheringState === 'complete') return;
return new Promise((resolve) => {
const checkState = () => {
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
};
pc.addEventListener('icegatheringstatechange', checkState);
});
}*/
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
pc.ontrack = e => audioEl.srcObject = e.streams[0]
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
const answer = await response.json()
await pc.setRemoteDescription(answer)
return pc
}
const connect = async () => {
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
peerConnection.onconnectionstatechange = () => {
let connectionState = peerConnection?.connectionState
if (connectionState === 'connected') {
_onConnected()
} else if (connectionState === 'disconnected') {
_onDisconnected()
}
}
}
const _onConnected = () => {
statusEl.textContent = "Connected"
buttonEl.textContent = "Disconnect"
connected = true
}
const _onDisconnected = () => {
statusEl.textContent = "Disconnected"
buttonEl.textContent = "Connect"
connected = false
}
const disconnect = () => {
if (!peerConnection) {
return
}
peerConnection.close()
peerConnection = null
_onDisconnected()
}
buttonEl.addEventListener("click", async () => {
if (!connected) {
await connect()
} else {
disconnect()
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(aiohttp_session: aiohttp.ClientSession = None):
key = os.getenv("DAILY_API_KEY")
if not key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
if not room.url:
raise HTTPException(status_code=500, detail="Failed to create room")
url = room.url
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)

View File

@@ -0,0 +1,7 @@
agent_name = "pcc-transport-chatbot"
image = "chadbailey59/pcc-transport-chatbot:0.2"
secret_set = "pcc-transport-chatbot-secrets"
[scaling]
min_instances = 0
max_instances = 2

View File

@@ -0,0 +1,5 @@
python-dotenv
fastapi[all]
uvicorn
-e ./pipecat[daily,cartesia,openai,silero,gladia,webrtc]
pipecatcloud

View File

@@ -0,0 +1,81 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import local_webrtc
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(local_webrtc, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@app.get("/")
async def serve_index():
return FileResponse("index.html")
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -74,6 +74,7 @@ openai = [ "websockets~=13.1" ]
openpipe = [ "openpipe~=4.48.0" ]
openrouter = []
perplexity = []
pipecatcloud = ["pipecatcloud" ]
playht = [ "pyht~=0.1.12", "websockets~=13.1" ]
qwen = []
rime = [ "websockets~=13.1" ]

View File

@@ -0,0 +1,313 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import logging
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Optional, Union
from fastapi import WebSocket
from pipecatcloud.agent import DailySessionArguments, SessionArguments, WebSocketSessionArguments
from pydantic import BaseModel, ConfigDict
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
FrameSerializer,
)
from pipecat.transports.network.small_webrtc import (
SmallWebRTCConnection,
SmallWebRTCTransport,
)
from pipecat.transports.services.daily import (
DailyDialinSettings,
DailyParams,
DailyTranscriptionSettings,
DailyTransport,
)
logger = logging.getLogger(__name__)
@dataclass
class WebRTCSessionArguments(SessionArguments):
"""WebRTC based agent session arguments. The arguments are received by the
bot() entry point.
"""
webrtc_connection: SmallWebRTCConnection
class PipecatCloudParams(TransportParams):
"""Parameters for PipecatCloudTransport.
This class combines parameters from all transport types:
- TransportParams (inherited)
- FastAPIWebsocketParams
- DailyParams
"""
# FastAPIWebsocketParams fields
add_wav_header: bool = False
serializer: Optional[FrameSerializer] = None
session_timeout: Optional[int] = None
# DailyParams fields
api_url: str = "https://api.daily.co/v1"
api_key: str = ""
dialin_settings: Optional[DailyDialinSettings] = None
transcription_enabled: bool = False
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
def to_transport_params(self) -> TransportParams:
"""Convert to TransportParams."""
return self
def to_websocket_params(self) -> FastAPIWebsocketParams:
"""Convert to FastAPIWebsocketParams."""
base_params = self.model_dump()
# Remove WebSocket-specific fields since we'll add them explicitly
websocket_fields = ["add_wav_header", "serializer", "session_timeout"]
for field in websocket_fields:
base_params.pop(field, None)
return FastAPIWebsocketParams(
**base_params,
add_wav_header=self.add_wav_header,
serializer=self.serializer,
session_timeout=self.session_timeout,
)
def to_daily_params(self) -> DailyParams:
"""Convert to DailyParams."""
base_params = self.model_dump()
# Remove Daily-specific fields since we'll add them explicitly
daily_fields = [
"api_url",
"api_key",
"dialin_settings",
"transcription_enabled",
"transcription_settings",
]
for field in daily_fields:
base_params.pop(field, None)
return DailyParams(
**base_params,
api_url=self.api_url,
api_key=self.api_key,
dialin_settings=self.dialin_settings,
transcription_enabled=self.transcription_enabled,
transcription_settings=self.transcription_settings,
)
class PipecatCloudSessionArguments:
"""Arguments for creating a PipecatCloudTransport session.
This class can be initialized with arguments for any of the supported transport types:
- WebSocket: Pass websocket=WebSocket
- Daily: Pass room_url=str, token=str, bot_name=str
- WebRTC: Pass webrtc_connection=SmallWebRTCConnection
"""
def __init__(
self,
*,
websocket: Optional[WebSocket] = None,
room_url: Optional[str] = None,
token: Optional[str] = None,
webrtc_connection: Optional[SmallWebRTCConnection] = None,
session_id: Optional[str] = None,
body: Optional[dict] = None,
):
"""Initialize session arguments for any supported transport type."""
if websocket is not None:
self._args = WebSocketSessionArguments(websocket=websocket, session_id=session_id)
elif room_url is not None:
self._args = DailySessionArguments(
# TODO-CB: bot_name is missing from DailySessionArguments?
room_url=room_url,
token=token,
session_id=session_id,
body=body,
)
elif webrtc_connection is not None:
self._args = WebRTCSessionArguments(
webrtc_connection=webrtc_connection, session_id=session_id
)
else:
raise ValueError(
"Must provide either websocket, (room_url, token, bot_name), or webrtc_connection"
)
@property
def args(self):
"""Get the underlying session arguments."""
return self._args
class PipecatCloudTransport(BaseTransport):
"""A transport that wraps FastAPIWebsocketTransport, SmallWebRTCTransport, and DailyTransport.
This transport will instantiate one of the three underlying transports based on the
session arguments provided to the constructor.
Event handlers:
@event_handler("on_client_connected"): Called when a client connects. Maps to:
- FastAPIWebsocketTransport.event_handler("on_client_connected")
- SmallWebRTCTransport.event_handler("on_client_connected")
- DailyTransport.event_handler("on_first_participant_joined")
@event_handler("on_client_disconnected"): Called when a client disconnects. Maps to:
- FastAPIWebsocketTransport.event_handler("on_client_disconnected")
- SmallWebRTCTransport.event_handler("on_client_disconnected")
- DailyTransport.event_handler("on_participant_left")
Other event handlers are passed through directly to the underlying transport.
Args:
session_args: Arguments for creating the session. The type of arguments determines
which transport will be used.
params: Configuration parameters for the transport. Parameters will be extracted
based on the session arguments type.
input_name: Optional name for the input transport.
output_name: Optional name for the output transport.
"""
# Event name mappings for each transport type
# Only include events that need to be mapped differently
_EVENT_MAPPINGS = {
DailyTransport: {
"on_client_connected": "on_first_participant_joined",
"on_client_disconnected": "on_participant_left",
},
}
def __init__(
self,
session_args: PipecatCloudSessionArguments,
params: Optional[Union[PipecatCloudParams, TransportParams]] = None,
*,
input_name: Optional[str] = None,
output_name: Optional[str] = None,
):
super().__init__(input_name=input_name, output_name=output_name)
# Convert TransportParams to PipecatCloudParams if needed
if isinstance(params, TransportParams):
cloud_params = PipecatCloudParams()
for field_name, field_value in params.model_dump().items():
setattr(cloud_params, field_name, field_value)
params = cloud_params
else:
params = params or PipecatCloudParams()
self._pending_handlers = {}
# Create the appropriate transport based on session arguments type
args = session_args.args
if isinstance(args, WebSocketSessionArguments):
logger.info("Using FastAPIWebsocketTransport")
websocket_params = params.to_websocket_params()
self._transport = FastAPIWebsocketTransport(
args.websocket,
websocket_params,
input_name=input_name,
output_name=output_name,
)
elif isinstance(args, DailySessionArguments):
logger.info("Using DailyTransport")
daily_params = params.to_daily_params()
self._transport = DailyTransport(
args.room_url,
args.token,
# TODO-CB: Bot name is missing from DailySessionArguments
"Bot",
params=daily_params,
input_name=input_name,
output_name=output_name,
)
elif isinstance(args, WebRTCSessionArguments):
logger.info("Using SmallWebRTCTransport")
transport_params = params.to_transport_params()
self._transport = SmallWebRTCTransport(
args.webrtc_connection,
transport_params,
input_name=input_name,
output_name=output_name,
)
else:
raise ValueError(f"Unsupported session arguments type: {type(args)}")
# Register any handlers that were added before transport creation
for event_name, handlers in self._pending_handlers.items():
for handler in handlers:
self._register_handler(event_name, handler)
def _register_handler(self, event_name: str, handler: Callable[..., Any]) -> None:
"""Register a handler with the appropriate transport method."""
transport_type = type(self._transport)
# If the transport type has mappings and the event needs to be mapped
if (
transport_type in self._EVENT_MAPPINGS
and event_name in self._EVENT_MAPPINGS[transport_type]
):
mapped_event = self._EVENT_MAPPINGS[transport_type][event_name]
else:
# Pass through the event name directly if no mapping exists
mapped_event = event_name
self._transport.event_handler(mapped_event)(handler)
def event_handler(self, event_name: str) -> Callable[..., Any]:
"""Register an event handler.
Args:
event_name: The name of the event to handle. Common events:
- "on_client_connected": Called when a client connects
- "on_client_disconnected": Called when a client disconnects
Other event names are passed through to the underlying transport.
Returns:
A decorator that registers the handler function.
"""
def decorator(handler: Callable[..., Any]) -> Callable[..., Any]:
if not hasattr(self, "_transport"):
# Store the handler to be registered when the transport is created
if event_name not in self._pending_handlers:
self._pending_handlers[event_name] = []
self._pending_handlers[event_name].append(handler)
else:
self._register_handler(event_name, handler)
return handler
return decorator
async def start(self, frame):
"""Start the transport."""
await self._transport.start(frame)
async def stop(self, frame):
"""Stop the transport."""
await self._transport.stop(frame)
async def cancel(self, frame):
"""Cancel the transport."""
await self._transport.cancel(frame)
@property
def input(self):
"""Get the input transport."""
return self._transport.input
@property
def output(self):
"""Get the output transport."""
return self._transport.output