wip - pcc-transport example

This commit is contained in:
Chad Bailey
2025-04-01 18:24:46 +00:00
parent 943c75c622
commit 0fede6bb48
37 changed files with 669 additions and 1 deletions

View File

@@ -0,0 +1,51 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
dist/
*.egg-info/
.installed.cfg
*.egg
.pytest_cache/
.coverage
.coverage.*
.env
.venv
env/
venv/
ENV/
.mypy_cache/
.dmypy.json
dmypy.json
# JavaScript/Node.js
node_modules/
dist/
dist-ssr/
*.local
.env.local
.env.development.local
.env.test.local
.env.production.local
# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
# Editor/IDE
.vscode/*
!.vscode/extensions.json
.idea/
*.swp
*.swo
.DS_Store
# Project specific
runpod.toml

View File

@@ -0,0 +1,9 @@
FROM dailyco/pipecat-base:latest
COPY ./requirements.txt requirements.txt
COPY ./assets assets
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./bot.py bot.py

View File

@@ -0,0 +1,33 @@
# Simple Chatbot Server
A Pipecat bot.py file that is built to be deployed to Pipecat Cloud.
## Environment Variables
Copy `env.example` to `.env` and configure:
```ini
OPENAI_API_KEY= # Your OpenAI API key (required for OpenAI bot)
CARTESIA_API_KEY= # Your Cartesia API key
```
## Running the server locally
Set up and activate your virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
Install dependencies:
```bash
pip install -r requirements.txt
```
Run the server:
```bash
LOCAL_RUN=1 python bot.py
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 759 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 884 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 876 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 874 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 882 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 885 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 888 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 890 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 898 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 836 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 908 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 905 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 849 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 866 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 864 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 858 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 875 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 881 KiB

View File

@@ -0,0 +1,316 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Bot Implementation.
This module implements a chatbot using OpenAI's GPT-4 model for natural language
processing. It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Text-to-speech using ElevenLabs
- Support for both English and Spanish
The bot runs as part of a pipeline that processes audio/video frames and manages
the conversation flow.
"""
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from pipecatcloud.agent import DailySessionArguments
from pipecatcloud.agent import SessionArguments as PCCSessionArguments
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.pipecat_cloud import (
PipecatCloudParams,
PipecatCloudTransport,
SessionArguments,
)
load_dotenv(override=True)
# Check if we're in local development mode
LOCAL_RUN = os.getenv("LOCAL_RUN")
if LOCAL_RUN:
import asyncio
import webbrowser
try:
from local_runner import configure
except ImportError:
logger.error("Could not import local_runner module. Local development mode may not work.")
# Logger for local dev
# logger.add(sys.stderr, level="DEBUG")
sprites = []
script_dir = os.path.dirname(__file__)
# Load sequential animation frames
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format))
# Create a smooth animation by adding reversed frames
flipped = sprites[::-1]
sprites.extend(flipped)
# Define static and animated states
quiet_frame = sprites[0] # Static frame for when bot is listening
talking_frame = SpriteFrame(images=sprites) # Animation sequence for when bot is talking
class TalkingAnimation(FrameProcessor):
"""Manages the bot's visual animation states.
Switches between static (listening) and animated (talking) states based on
the bot's current speaking status.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and update animation state.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame, direction)
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
"""Fetch weather data dummy function.
This function simulates fetching weather data from an external API.
It demonstrates how to call an external service from the language model.
"""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
await result_callback({"conditions": "nice", "temperature": "75"})
async def main(session_args: SessionArguments):
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport
- Speech-to-text and text-to-speech services
- Language model integration
- Animation processing
- RTVI event handling
"""
logger.info(f"session args: {session_args}")
# Set up Daily transport with video/audio parameters
transport = PipecatCloudTransport(
session_args=session_args,
params=PipecatCloudParams(
audio_out_enabled=True, # Enable output audio for the bot
camera_out_enabled=True, # Enable the camera output for the bot
camera_out_width=1024, # Set the camera output width
camera_out_height=576, # Set the camera output height
transcription_enabled=True, # Enable transcription for the user
vad_enabled=True, # Enable VAD to handle user speech
vad_analyzer=SileroVADAnalyzer(), # Use the Silero VAD analyzer
vad_audio_passthrough=True, # Pass audio through VAD for user speech to the rest of the pipeline
),
)
# Initialize text-to-speech service
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman
)
stt = GladiaSTTService(api_key=os.getenv("GLADIA_API_KEY"))
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register your function call providing the function name and callback
llm.register_function("get_current_weather", fetch_weather_from_api)
# Define your function call using the FunctionSchema
# Learn more about function calling in Pipecat:
# https://docs.pipecat.ai/guides/features/function-calling
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
# Set up the tools schema with your weather function call
tools = ToolsSchema(standard_tools=[weather_function])
# Set up initial messages for the bot
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
# Pass your initial messages and tools to the context to initialize the context
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
ta = TalkingAnimation()
# RTVI events for Pipecat client UI
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
# Add your processors to the pipeline
pipeline = Pipeline(
[
transport.input(),
stt,
rtvi,
context_aggregator.user(),
llm,
tts,
ta,
transport.output(),
context_aggregator.assistant(),
]
)
# Create a PipelineTask to manage the pipeline
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
# Notify the client that the bot is ready
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
# Push a static frame to show the bot is listening
await task.queue_frame(quiet_frame)
# Capture the first participant's transcription
# await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation by pushing a context frame to the pipeline
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, participant):
logger.debug(f"Participant left: {participant}")
# Cancel the PipelineTask to stop processing
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
async def bot(args: DailySessionArguments):
"""Main bot entry point compatible with Pipecat Cloud.
Args:
room_url: The Daily room URL
token: The Daily room token
body: The configuration object from the request body
session_id: The session ID for logging
"""
logger.info(f"Bot process initialized {args.room_url} {args.token}")
try:
await main(args)
logger.info("Bot process completed")
except Exception as e:
logger.exception(f"Error in bot process: {str(e)}")
raise
# Local development
async def local_daily():
# TODO-CB: This becomes SmallWebRTCTransport
"""Function for local development testing."""
try:
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
logger.warning("_")
logger.warning("_")
logger.warning(f"Talk to your voice agent here: {room_url}")
logger.warning("_")
logger.warning("_")
webbrowser.open(room_url)
await main(room_url, token, config={})
except Exception as e:
logger.exception(f"Error in local development mode: {e}")
async def local_webrtc(webrtc_connection):
await main(SessionArguments(webrtc_connection=webrtc_connection))
# Local development entry point
if LOCAL_RUN and __name__ == "__main__":
try:
asyncio.run(local_daily())
except Exception as e:
logger.exception(f"Failed to run in local mode: {e}")

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.1"
DOCKER_USERNAME="your-docker-hub-username"
AGENT_NAME="simple-chatbot"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY=sk-PL...
CARTESIA_API_KEY=aeb...

View File

@@ -0,0 +1,100 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebRTC Voice Agent</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; margin-top: 50px; }
#status { font-size: 20px; margin: 20px; }
button { padding: 10px 20px; font-size: 16px; }
</style>
</head>
<body>
<h1>WebRTC Voice Agent</h1>
<p id="status">Disconnected</p>
<button id="connect-btn">Connect</button>
<audio id="audio-el" autoplay></audio>
<script>
const statusEl = document.getElementById("status")
const buttonEl = document.getElementById("connect-btn")
const audioEl = document.getElementById("audio-el")
let connected = false
let peerConnection = null
/*const waitForIceGatheringComplete = async (pc) => {
if (pc.iceGatheringState === 'complete') return;
return new Promise((resolve) => {
const checkState = () => {
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
};
pc.addEventListener('icegatheringstatechange', checkState);
});
}*/
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
pc.ontrack = e => audioEl.srcObject = e.streams[0]
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
headers: { 'Content-Type': 'application/json' },
method: 'POST',
});
const answer = await response.json()
await pc.setRemoteDescription(answer)
return pc
}
const connect = async () => {
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
peerConnection.onconnectionstatechange = () => {
let connectionState = peerConnection?.connectionState
if (connectionState === 'connected') {
_onConnected()
} else if (connectionState === 'disconnected') {
_onDisconnected()
}
}
}
const _onConnected = () => {
statusEl.textContent = "Connected"
buttonEl.textContent = "Disconnect"
connected = true
}
const _onDisconnected = () => {
statusEl.textContent = "Disconnected"
buttonEl.textContent = "Connect"
connected = false
}
const disconnect = () => {
if (!peerConnection) {
return
}
peerConnection.close()
peerConnection = null
_onDisconnected()
}
buttonEl.addEventListener("click", async () => {
if (!connected) {
await connect()
} else {
disconnect()
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(aiohttp_session: aiohttp.ClientSession = None):
key = os.getenv("DAILY_API_KEY")
if not key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
room = await daily_rest_helper.create_room(
DailyRoomParams(properties={"enable_prejoin_ui": False})
)
if not room.url:
raise HTTPException(status_code=500, detail="Failed to create room")
url = room.url
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)

View File

@@ -0,0 +1,6 @@
agent_name = "simple-chatbot"
image = "your-docker-hub-username/simple-chatbot:0.1"
secret_set = "simple-chatbot-secrets"
[scaling]
min_instances = 0

View File

@@ -0,0 +1,5 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,cartesia,openai,silero]
pipecatcloud

View File

@@ -0,0 +1,81 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import local_webrtc
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(local_webrtc, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@app.get("/")
async def serve_index():
return FileResponse("index.html")
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -105,7 +105,7 @@ class SessionArguments:
"""Initialize session arguments for any supported transport type."""
if websocket is not None:
self._args = WebSocketSessionArguments(websocket=websocket, session_id=session_id)
elif all(x is not None for x in (room_url, token, bot_name)):
elif any(x is not None for x in (room_url, token, bot_name)):
self._args = DailySessionArguments(
room_url=room_url, token=token, bot_name=bot_name, session_id=session_id
)