Compare commits

...

6 Commits

Author SHA1 Message Date
Chad Bailey
052b2439a5 changed data type of callback signature 2025-04-11 15:47:04 +00:00
Chad Bailey
d39e46a9e1 cleanup 2025-04-08 21:10:49 +00:00
Chad Bailey
031879423a removed image 2025-04-08 16:58:00 +00:00
Chad Bailey
df1aac0cc8 restructured example and added readme 2025-04-08 15:29:27 +00:00
Chad Bailey
676f0a7b64 added working example 2025-04-08 14:55:57 +00:00
Chad Bailey
f948a144f8 added multi transport example 2025-04-07 22:13:27 +00:00
11 changed files with 640 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
FROM dailyco/pipecat-base:latest
RUN apt-get update && apt-get install ffmpeg -y
COPY ./requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./bot.py bot.py

View File

@@ -0,0 +1,65 @@
# Multi-Transport Chatbot for Pipecat and Pipecat Cloud
This project demonstrates a bot architecture that allows you to use different transports with the same bot, depending on how you run the botfile. This can be really useful for starting with one transport for early development and then transitioning to a different transport in production.
Here's how to use this bot with each of the supported transports.
## Step 1: Local development with SmallWebRTCTransport
To get started, let's run the bot with SmallWebRTCTransport, which makes a direct peer-to-peer WebRTC connection between your browser and the bot.
```bash
# Start with the standard venv setup:
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Rename the env example and add your keys:
mv example.env .env
# Now run the included webserver:
python server.py
```
Open a browser pointed at `http://localhost:7860` and click the **Connect** button to talk to the bot.
`server.py` helps set up the WebRTC connection, and then calls the `local_webrtc` function in bot.py with this line of code:
```python
background_tasks.add_task(local_webrtc, pipecat_connection)
```
In `bot.py`, you can see that the `local_webrtc` function creates a `SmallWebRTCTransport` instance and passes it to the `main()` function.
## Step 2: Local development with Daily
After step 1, you can run the same bot using the Daily transport. Add a `DAILY_API_KEY` to your .env file. If you have a Daily account already, you can get your API key from https://dashboard.daily.co/developers. If you have a Pipecat Cloud account, you have a Daily API key available at https://pipecat.daily.co/<your-org-slug>/settings/daily.
Run the bot using a different entrypoint:
```bash
LOCAL_RUN=1 python bot.py
```
This uses the `local_daily()` function in `bot.py`, which creates a `DailyTransport`.
### Step 3: Deploy to Pipecat Cloud
This repo already includes a Dockerfile you can use to build an image that works with Pipecat Cloud. You can do it in a few steps. First, edit `build.sh` and `pcc-deploy.toml` and replace `your-dockerhub-username` with, well, your DockerHub username. Then:
```bash
#
./build.sh
pcc deploy
# Then start a session with your bot
pcc agent start multi-transport-chatbot --use-daily
```
This will give you a URL you can open in your browser to talk to the bot using Daily Prebuilt.
Behind the scenes, Pipecat Cloud loads your botfile and calls its `bot()` function. Since you used the `--use-daily` option, the `args` argument is a `DailySessionArguments` instance that includes the Daily room URL and token, so the bot uses a `DailyTransport`.
## Step 4: Use a Twilio phone number and websocket
Follow the [Pipecat Cloud Twilio docs](https://docs.pipecat.daily.co/pipecat-in-production/twilio-mediastreams) to configure a TwiML Bin that points one of your phone numbers to Pipecat Cloud. When you dial that number, Pipecat Cloud will start a session with your bot that includes a `WebsocketArguments` object, so the `bot()` function will start your bot with a `FastAPIWebsocketTransport`.

View File

@@ -0,0 +1,288 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Bot Implementation.
This module implements a chatbot using OpenAI's GPT-4 model for natural language
processing. It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Text-to-speech using ElevenLabs
- Support for both English and Spanish
The bot runs as part of a pipeline that processes audio/video frames and manages
the conversation flow.
"""
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecatcloud.agent import DailySessionArguments, SessionArguments, WebSocketSessionArguments
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.gladia.stt import GladiaSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
# Check if we're in local development mode
LOCAL_RUN = os.getenv("LOCAL_RUN")
if LOCAL_RUN:
import asyncio
import webbrowser
try:
from local_runner import configure
except ImportError:
logger.error("Could not import local_runner module. Local development mode may not work.")
# Logger for local dev
# logger.add(sys.stderr, level="DEBUG")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
"""Fetch weather data dummy function.
This function simulates fetching weather data from an external API.
It demonstrates how to call an external service from the language model.
"""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
await result_callback({"conditions": "nice", "temperature": "75"})
async def main(transport: BaseTransport):
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Speech-to-text and text-to-speech services
- Language model integration
- Animation processing
- RTVI event handling
Uses the transport defined by the calling function.
See below for various ways to start the bot with different transports.
"""
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman
)
stt = GladiaSTTService(api_key=os.getenv("GLADIA_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register your function call providing the function name and callback
llm.register_function("get_current_weather", fetch_weather_from_api)
# Define your function call using the FunctionSchema
# Learn more about function calling in Pipecat:
# https://docs.pipecat.ai/guides/features/function-calling
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
# Set up the tools schema with your weather function call
tools = ToolsSchema(standard_tools=[weather_function])
# Set up initial messages for the bot
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
# Pass your initial messages and tools to the context to initialize the context
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# RTVI events for Pipecat client UI
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
# Add your processors to the pipeline
pipeline = Pipeline(
[
transport.input(),
stt,
rtvi,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
# Create a PipelineTask to manage the pipeline
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
# Notify the client that the bot is ready
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation by pushing a context frame to the pipeline
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
# Cancel the PipelineTask to stop processing
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
shared_params = {
"audio_in_enabled": True,
"audio_out_enabled": True,
"video_in_enabled": False,
"video_out_enabled": False,
"vad_enabled": True,
"vad_analyzer": SileroVADAnalyzer(),
"vad_audio_passthrough": True,
}
async def bot(args: SessionArguments):
"""Bot entry point compatible with Pipecat Cloud. SessionArguments
will be a different subclass depending on how the session is started.
args: either DailySessionArguments or WebsocketSessionArguments
DailySessionArguments:
room_url: The Daily room URL
token: The Daily room token
body: The configuration object from the request body
session_id: The session ID for logging
WebsocketSessionArguments:
websocket: The websocket for connecting to Twilio
"""
logger.info(f"Starting PCC bot. args: {args}")
if isinstance(args, WebSocketSessionArguments):
logger.debug("Starting WebSocket bot")
start_data = args.websocket.iter_text()
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
stream_sid = call_data["start"]["streamSid"]
transport = FastAPIWebsocketTransport(
websocket=args.websocket,
params=FastAPIWebsocketParams(
**shared_params,
serializer=TwilioFrameSerializer(stream_sid),
),
)
elif isinstance(args, DailySessionArguments):
logger.debug("Starting Daily bot")
transport = DailyTransport(
args.room_url,
args.token,
"Respond bot",
DailyParams(**shared_params, transcription_enabled=False),
)
try:
await main(transport)
logger.info("Bot process completed")
except Exception as e:
logger.exception(f"Error in bot process: {str(e)}")
raise
# Local development
async def local_daily():
"""This is an entrypoint for running your bot locally but using Daily
for the transport. To use this, you'll need to have DAILY_API_KEY set in your .env file.
"""
try:
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
logger.warning(f"Talk to your voice agent here: {room_url}")
webbrowser.open(room_url)
transport = DailyTransport(
room_url=room_url,
token=token,
bot_name="Bot",
params=DailyParams(**shared_params, transcription_enabled=False),
)
await main(transport)
except Exception as e:
logger.exception(f"Error in local development mode: {e}")
async def local_webrtc(webrtc_connection):
"""An entrypoint for using the SmallWebRTCTransport, which doesn't require a Daily
account or API key. You'll need to run the web client and small API server included
with this example to use this transport. Run `python server.py` to use it.
"""
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection, params=TransportParams(**shared_params)
)
await main(transport)
# Local development entry point
if LOCAL_RUN and __name__ == "__main__":
try:
# Change this line to run whichever entrypoint you want to use for your bot.
asyncio.run(local_daily())
except Exception as e:
logger.exception(f"Failed to run in local mode: {e}")

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.1"
DOCKER_USERNAME="your-dockerhub-username"
AGENT_NAME="multi-transport-chatbot"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -0,0 +1,3 @@
OPENAI_API_KEY=sk-PL...
CARTESIA_API_KEY=aeb...
GLADIA_API_KEY=54e...

View File

@@ -0,0 +1,100 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebRTC Voice Agent</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; margin-top: 50px; }
#status { font-size: 20px; margin: 20px; }
button { padding: 10px 20px; font-size: 16px; }
</style>
</head>
<body>
<h1>WebRTC Voice Agent</h1>
<p id="status">Disconnected</p>
<button id="connect-btn">Connect</button>
<audio id="audio-el" autoplay></audio>
<script>
const statusEl = document.getElementById("status")
const buttonEl = document.getElementById("connect-btn")
const audioEl = document.getElementById("audio-el")
let connected = false
let peerConnection = null
/*const waitForIceGatheringComplete = async (pc) => {
if (pc.iceGatheringState === 'complete') return;
return new Promise((resolve) => {
const checkState = () => {
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
};
pc.addEventListener('icegatheringstatechange', checkState);
});
}*/
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
pc.ontrack = e => audioEl.srcObject = e.streams[0]
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
const answer = await response.json()
await pc.setRemoteDescription(answer)
return pc
}
const connect = async () => {
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
peerConnection.onconnectionstatechange = () => {
let connectionState = peerConnection?.connectionState
if (connectionState === 'connected') {
_onConnected()
} else if (connectionState === 'disconnected') {
_onDisconnected()
}
}
}
const _onConnected = () => {
statusEl.textContent = "Connected"
buttonEl.textContent = "Disconnect"
connected = true
}
const _onDisconnected = () => {
statusEl.textContent = "Disconnected"
buttonEl.textContent = "Connect"
connected = false
}
const disconnect = () => {
if (!peerConnection) {
return
}
peerConnection.close()
peerConnection = null
_onDisconnected()
}
buttonEl.addEventListener("click", async () => {
if (!connected) {
await connect()
} else {
disconnect()
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(aiohttp_session: aiohttp.ClientSession = None):
key = os.getenv("DAILY_API_KEY")
if not key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
if not room.url:
raise HTTPException(status_code=500, detail="Failed to create room")
url = room.url
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)

View File

@@ -0,0 +1,7 @@
agent_name = "multi-transport-chatbot"
image = "your-dockerhub-username/multi-transport-chatbot:0.1"
secret_set = "pcc-transport-chatbot-secrets"
[scaling]
min_instances = 0
max_instances = 2

View File

@@ -0,0 +1,5 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[cartesia,daily,gladia,openai,silero,webrtc]
pipecatcloud

View File

@@ -0,0 +1,81 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import local_webrtc
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(local_webrtc, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@app.get("/")
async def serve_index():
return FileResponse("index.html")
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -169,6 +169,8 @@ class DailyCallbacks(BaseModel):
on_error: Called when an error occurs.
on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes.
on_client_connected: Called when a client (participant) connects.
on_client_disconnected: Called when a client (participant) disconnects.
on_dialin_connected: Called when dial-in is connected.
on_dialin_ready: Called when dial-in is ready.
on_dialin_stopped: Called when dial-in is stopped.
@@ -193,6 +195,8 @@ class DailyCallbacks(BaseModel):
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
on_client_connected: Callable[[Mapping[str, Any]], Awaitable[None]]
on_client_disconnected: Callable[[Mapping[str, Any]], Awaitable[None]]
on_dialin_connected: Callable[[Any], Awaitable[None]]
on_dialin_ready: Callable[[str], Awaitable[None]]
on_dialin_stopped: Callable[[Any], Awaitable[None]]
@@ -1070,6 +1074,8 @@ class DailyTransport(BaseTransport):
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
on_client_connected=self._on_client_connected,
on_client_disconnected=self._on_client_disconnected,
on_dialin_connected=self._on_dialin_connected,
on_dialin_ready=self._on_dialin_ready,
on_dialin_stopped=self._on_dialin_stopped,
@@ -1103,6 +1109,8 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_error")
self._register_event_handler("on_app_message")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_client_connected")
self._register_event_handler("on_client_disconnected")
self._register_event_handler("on_dialin_connected")
self._register_event_handler("on_dialin_ready")
self._register_event_handler("on_dialin_stopped")
@@ -1246,6 +1254,12 @@ class DailyTransport(BaseTransport):
async def _on_call_state_updated(self, state: str):
await self._call_event_handler("on_call_state_updated", state)
async def _on_client_connected(self, participant: Any):
await self._call_event_handler("on_client_connected", participant)
async def _on_client_disconnected(self, participant: Any):
await self._call_event_handler("on_client_disconnected", participant)
async def _handle_dialin_ready(self, sip_endpoint: str):
if not self._params.dialin_settings:
return
@@ -1321,11 +1335,15 @@ class DailyTransport(BaseTransport):
await self._call_event_handler("on_first_participant_joined", participant)
await self._call_event_handler("on_participant_joined", participant)
# Also call on_client_connected for compatibility with other transports
await self._call_event_handler("on_client_connected", participant)
async def _on_participant_left(self, participant, reason):
id = participant["id"]
logger.info(f"Participant left {id}")
await self._call_event_handler("on_participant_left", participant, reason)
# Also call on_client_disconnected for compatibility with other transports
await self._call_event_handler("on_client_disconnected", participant)
async def _on_participant_updated(self, participant):
await self._call_event_handler("on_participant_updated", participant)