Compare commits
6 Commits
pk/decoupl
...
cb/multi-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
052b2439a5 | ||
|
|
d39e46a9e1 | ||
|
|
031879423a | ||
|
|
df1aac0cc8 | ||
|
|
676f0a7b64 | ||
|
|
f948a144f8 |
8
examples/multi-transport-chatbot/Dockerfile
Normal file
8
examples/multi-transport-chatbot/Dockerfile
Normal file
@@ -0,0 +1,8 @@
|
||||
FROM dailyco/pipecat-base:latest
|
||||
RUN apt-get update && apt-get install ffmpeg -y
|
||||
|
||||
COPY ./requirements.txt requirements.txt
|
||||
|
||||
RUN pip install --no-cache-dir --upgrade -r requirements.txt
|
||||
|
||||
COPY ./bot.py bot.py
|
||||
65
examples/multi-transport-chatbot/README.md
Normal file
65
examples/multi-transport-chatbot/README.md
Normal file
@@ -0,0 +1,65 @@
|
||||
# Multi-Transport Chatbot for Pipecat and Pipecat Cloud
|
||||
|
||||
This project demonstrates a bot architecture that allows you to use different transports with the same bot, depending on how you run the botfile. This can be really useful for starting with one transport for early development and then transitioning to a different transport in production.
|
||||
|
||||
Here's how to use this bot with each of the supported transports.
|
||||
|
||||
## Step 1: Local development with SmallWebRTCTransport
|
||||
|
||||
To get started, let's run the bot with SmallWebRTCTransport, which makes a direct peer-to-peer WebRTC connection between your browser and the bot.
|
||||
|
||||
```bash
|
||||
# Start with the standard venv setup:
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Rename the env example and add your keys:
|
||||
mv example.env .env
|
||||
|
||||
# Now run the included webserver:
|
||||
python server.py
|
||||
```
|
||||
|
||||
Open a browser pointed at `http://localhost:7860` and click the **Connect** button to talk to the bot.
|
||||
|
||||
`server.py` helps set up the WebRTC connection, and then calls the `local_webrtc` function in bot.py with this line of code:
|
||||
|
||||
```python
|
||||
background_tasks.add_task(local_webrtc, pipecat_connection)
|
||||
```
|
||||
|
||||
In `bot.py`, you can see that the `local_webrtc` function creates a `SmallWebRTCTransport` instance and passes it to the `main()` function.
|
||||
|
||||
## Step 2: Local development with Daily
|
||||
|
||||
After step 1, you can run the same bot using the Daily transport. Add a `DAILY_API_KEY` to your .env file. If you have a Daily account already, you can get your API key from https://dashboard.daily.co/developers. If you have a Pipecat Cloud account, you have a Daily API key available at https://pipecat.daily.co/<your-org-slug>/settings/daily.
|
||||
|
||||
Run the bot using a different entrypoint:
|
||||
|
||||
```bash
|
||||
LOCAL_RUN=1 python bot.py
|
||||
```
|
||||
|
||||
This uses the `local_daily()` function in `bot.py`, which creates a `DailyTransport`.
|
||||
|
||||
### Step 3: Deploy to Pipecat Cloud
|
||||
|
||||
This repo already includes a Dockerfile you can use to build an image that works with Pipecat Cloud. You can do it in a few steps. First, edit `build.sh` and `pcc-deploy.toml` and replace `your-dockerhub-username` with, well, your DockerHub username. Then:
|
||||
|
||||
```bash
|
||||
#
|
||||
./build.sh
|
||||
pcc deploy
|
||||
|
||||
# Then start a session with your bot
|
||||
pcc agent start multi-transport-chatbot --use-daily
|
||||
```
|
||||
|
||||
This will give you a URL you can open in your browser to talk to the bot using Daily Prebuilt.
|
||||
|
||||
Behind the scenes, Pipecat Cloud loads your botfile and calls its `bot()` function. Since you used the `--use-daily` option, the `args` argument is a `DailySessionArguments` instance that includes the Daily room URL and token, so the bot uses a `DailyTransport`.
|
||||
|
||||
## Step 4: Use a Twilio phone number and websocket
|
||||
|
||||
Follow the [Pipecat Cloud Twilio docs](https://docs.pipecat.daily.co/pipecat-in-production/twilio-mediastreams) to configure a TwiML Bin that points one of your phone numbers to Pipecat Cloud. When you dial that number, Pipecat Cloud will start a session with your bot that includes a `WebsocketArguments` object, so the `bot()` function will start your bot with a `FastAPIWebsocketTransport`.
|
||||
288
examples/multi-transport-chatbot/bot.py
Normal file
288
examples/multi-transport-chatbot/bot.py
Normal file
@@ -0,0 +1,288 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Bot Implementation.
|
||||
|
||||
This module implements a chatbot using OpenAI's GPT-4 model for natural language
|
||||
processing. It includes:
|
||||
- Real-time audio/video interaction through Daily
|
||||
- Animated robot avatar
|
||||
- Text-to-speech using ElevenLabs
|
||||
- Support for both English and Spanish
|
||||
|
||||
The bot runs as part of a pipeline that processes audio/video frames and manages
|
||||
the conversation flow.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
from pipecatcloud.agent import DailySessionArguments, SessionArguments, WebSocketSessionArguments
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
OutputImageRawFrame,
|
||||
SpriteFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.gladia.stt import GladiaSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
FastAPIWebsocketTransport,
|
||||
)
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Check if we're in local development mode
|
||||
LOCAL_RUN = os.getenv("LOCAL_RUN")
|
||||
if LOCAL_RUN:
|
||||
import asyncio
|
||||
import webbrowser
|
||||
|
||||
try:
|
||||
from local_runner import configure
|
||||
except ImportError:
|
||||
logger.error("Could not import local_runner module. Local development mode may not work.")
|
||||
|
||||
# Logger for local dev
|
||||
# logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
"""Fetch weather data dummy function.
|
||||
|
||||
This function simulates fetching weather data from an external API.
|
||||
It demonstrates how to call an external service from the language model.
|
||||
"""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
await result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def main(transport: BaseTransport):
|
||||
"""Main bot execution function.
|
||||
|
||||
Sets up and runs the bot pipeline including:
|
||||
- Speech-to-text and text-to-speech services
|
||||
- Language model integration
|
||||
- Animation processing
|
||||
- RTVI event handling
|
||||
|
||||
Uses the transport defined by the calling function.
|
||||
See below for various ways to start the bot with different transports.
|
||||
"""
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="c45bc5ec-dc68-4feb-8829-6e6b2748095d", # Movieman
|
||||
)
|
||||
|
||||
stt = GladiaSTTService(api_key=os.getenv("GLADIA_API_KEY"))
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
# Register your function call providing the function name and callback
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
|
||||
# Define your function call using the FunctionSchema
|
||||
# Learn more about function calling in Pipecat:
|
||||
# https://docs.pipecat.ai/guides/features/function-calling
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
|
||||
# Set up the tools schema with your weather function call
|
||||
tools = ToolsSchema(standard_tools=[weather_function])
|
||||
|
||||
# Set up initial messages for the bot
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||||
},
|
||||
]
|
||||
|
||||
# Set up conversation context and management
|
||||
# The context_aggregator will automatically collect conversation context
|
||||
# Pass your initial messages and tools to the context to initialize the context
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# RTVI events for Pipecat client UI
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
# Add your processors to the pipeline
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
rtvi,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
# Create a PipelineTask to manage the pipeline
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
# Notify the client that the bot is ready
|
||||
await rtvi.set_bot_ready()
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
# Kick off the conversation by pushing a context frame to the pipeline
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
# Cancel the PipelineTask to stop processing
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
shared_params = {
|
||||
"audio_in_enabled": True,
|
||||
"audio_out_enabled": True,
|
||||
"video_in_enabled": False,
|
||||
"video_out_enabled": False,
|
||||
"vad_enabled": True,
|
||||
"vad_analyzer": SileroVADAnalyzer(),
|
||||
"vad_audio_passthrough": True,
|
||||
}
|
||||
|
||||
|
||||
async def bot(args: SessionArguments):
|
||||
"""Bot entry point compatible with Pipecat Cloud. SessionArguments
|
||||
will be a different subclass depending on how the session is started.
|
||||
|
||||
args: either DailySessionArguments or WebsocketSessionArguments
|
||||
DailySessionArguments:
|
||||
room_url: The Daily room URL
|
||||
token: The Daily room token
|
||||
body: The configuration object from the request body
|
||||
session_id: The session ID for logging
|
||||
|
||||
WebsocketSessionArguments:
|
||||
websocket: The websocket for connecting to Twilio
|
||||
"""
|
||||
logger.info(f"Starting PCC bot. args: {args}")
|
||||
|
||||
if isinstance(args, WebSocketSessionArguments):
|
||||
logger.debug("Starting WebSocket bot")
|
||||
|
||||
start_data = args.websocket.iter_text()
|
||||
await start_data.__anext__()
|
||||
call_data = json.loads(await start_data.__anext__())
|
||||
stream_sid = call_data["start"]["streamSid"]
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=args.websocket,
|
||||
params=FastAPIWebsocketParams(
|
||||
**shared_params,
|
||||
serializer=TwilioFrameSerializer(stream_sid),
|
||||
),
|
||||
)
|
||||
elif isinstance(args, DailySessionArguments):
|
||||
logger.debug("Starting Daily bot")
|
||||
transport = DailyTransport(
|
||||
args.room_url,
|
||||
args.token,
|
||||
"Respond bot",
|
||||
DailyParams(**shared_params, transcription_enabled=False),
|
||||
)
|
||||
try:
|
||||
await main(transport)
|
||||
logger.info("Bot process completed")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error in bot process: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
# Local development
|
||||
async def local_daily():
|
||||
"""This is an entrypoint for running your bot locally but using Daily
|
||||
for the transport. To use this, you'll need to have DAILY_API_KEY set in your .env file.
|
||||
"""
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
logger.warning(f"Talk to your voice agent here: {room_url}")
|
||||
webbrowser.open(room_url)
|
||||
transport = DailyTransport(
|
||||
room_url=room_url,
|
||||
token=token,
|
||||
bot_name="Bot",
|
||||
params=DailyParams(**shared_params, transcription_enabled=False),
|
||||
)
|
||||
await main(transport)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error in local development mode: {e}")
|
||||
|
||||
|
||||
async def local_webrtc(webrtc_connection):
|
||||
"""An entrypoint for using the SmallWebRTCTransport, which doesn't require a Daily
|
||||
account or API key. You'll need to run the web client and small API server included
|
||||
with this example to use this transport. Run `python server.py` to use it.
|
||||
"""
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection, params=TransportParams(**shared_params)
|
||||
)
|
||||
await main(transport)
|
||||
|
||||
|
||||
# Local development entry point
|
||||
if LOCAL_RUN and __name__ == "__main__":
|
||||
try:
|
||||
# Change this line to run whichever entrypoint you want to use for your bot.
|
||||
asyncio.run(local_daily())
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to run in local mode: {e}")
|
||||
19
examples/multi-transport-chatbot/build.sh
Executable file
19
examples/multi-transport-chatbot/build.sh
Executable file
@@ -0,0 +1,19 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
VERSION="0.1"
|
||||
DOCKER_USERNAME="your-dockerhub-username"
|
||||
AGENT_NAME="multi-transport-chatbot"
|
||||
|
||||
# Build the Docker image with the correct context
|
||||
echo "Building Docker image..."
|
||||
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
|
||||
|
||||
# Push the Docker images
|
||||
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
|
||||
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
|
||||
|
||||
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
|
||||
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
|
||||
|
||||
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"
|
||||
3
examples/multi-transport-chatbot/env.example
Normal file
3
examples/multi-transport-chatbot/env.example
Normal file
@@ -0,0 +1,3 @@
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
CARTESIA_API_KEY=aeb...
|
||||
GLADIA_API_KEY=54e...
|
||||
100
examples/multi-transport-chatbot/index.html
Normal file
100
examples/multi-transport-chatbot/index.html
Normal file
@@ -0,0 +1,100 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>WebRTC Voice Agent</title>
|
||||
<style>
|
||||
body { font-family: Arial, sans-serif; text-align: center; margin-top: 50px; }
|
||||
#status { font-size: 20px; margin: 20px; }
|
||||
button { padding: 10px 20px; font-size: 16px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>WebRTC Voice Agent</h1>
|
||||
<p id="status">Disconnected</p>
|
||||
<button id="connect-btn">Connect</button>
|
||||
<audio id="audio-el" autoplay></audio>
|
||||
|
||||
<script>
|
||||
const statusEl = document.getElementById("status")
|
||||
const buttonEl = document.getElementById("connect-btn")
|
||||
const audioEl = document.getElementById("audio-el")
|
||||
|
||||
let connected = false
|
||||
let peerConnection = null
|
||||
|
||||
/*const waitForIceGatheringComplete = async (pc) => {
|
||||
if (pc.iceGatheringState === 'complete') return;
|
||||
return new Promise((resolve) => {
|
||||
const checkState = () => {
|
||||
if (pc.iceGatheringState === 'complete') {
|
||||
pc.removeEventListener('icegatheringstatechange', checkState);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
pc.addEventListener('icegatheringstatechange', checkState);
|
||||
});
|
||||
}*/
|
||||
|
||||
const createSmallWebRTCConnection = async (audioTrack) => {
|
||||
const pc = new RTCPeerConnection()
|
||||
pc.ontrack = e => audioEl.srcObject = e.streams[0]
|
||||
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
|
||||
await pc.setLocalDescription(await pc.createOffer())
|
||||
//await waitForIceGatheringComplete(pc)
|
||||
const offer = pc.localDescription
|
||||
const response = await fetch('/api/offer', {
|
||||
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
method: 'POST',
|
||||
});
|
||||
const answer = await response.json()
|
||||
await pc.setRemoteDescription(answer)
|
||||
return pc
|
||||
}
|
||||
|
||||
const connect = async () => {
|
||||
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
|
||||
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
|
||||
peerConnection.onconnectionstatechange = () => {
|
||||
let connectionState = peerConnection?.connectionState
|
||||
if (connectionState === 'connected') {
|
||||
_onConnected()
|
||||
} else if (connectionState === 'disconnected') {
|
||||
_onDisconnected()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const _onConnected = () => {
|
||||
statusEl.textContent = "Connected"
|
||||
buttonEl.textContent = "Disconnect"
|
||||
connected = true
|
||||
}
|
||||
|
||||
const _onDisconnected = () => {
|
||||
statusEl.textContent = "Disconnected"
|
||||
buttonEl.textContent = "Connect"
|
||||
connected = false
|
||||
}
|
||||
|
||||
const disconnect = () => {
|
||||
if (!peerConnection) {
|
||||
return
|
||||
}
|
||||
peerConnection.close()
|
||||
peerConnection = null
|
||||
_onDisconnected()
|
||||
}
|
||||
|
||||
buttonEl.addEventListener("click", async () => {
|
||||
if (!connected) {
|
||||
await connect()
|
||||
} else {
|
||||
disconnect()
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
46
examples/multi-transport-chatbot/local_runner.py
Normal file
46
examples/multi-transport-chatbot/local_runner.py
Normal file
@@ -0,0 +1,46 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
(url, token) = await configure_with_args(aiohttp_session)
|
||||
return (url, token)
|
||||
|
||||
|
||||
async def configure_with_args(aiohttp_session: aiohttp.ClientSession = None):
|
||||
key = os.getenv("DAILY_API_KEY")
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
room = await daily_rest_helper.create_room(
|
||||
DailyRoomParams(properties={"enable_prejoin_ui": False})
|
||||
)
|
||||
if not room.url:
|
||||
raise HTTPException(status_code=500, detail="Failed to create room")
|
||||
|
||||
url = room.url
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
7
examples/multi-transport-chatbot/pcc-deploy.toml
Normal file
7
examples/multi-transport-chatbot/pcc-deploy.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
agent_name = "multi-transport-chatbot"
|
||||
image = "your-dockerhub-username/multi-transport-chatbot:0.1"
|
||||
secret_set = "pcc-transport-chatbot-secrets"
|
||||
|
||||
[scaling]
|
||||
min_instances = 0
|
||||
max_instances = 2
|
||||
5
examples/multi-transport-chatbot/requirements.txt
Normal file
5
examples/multi-transport-chatbot/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
pipecat-ai[cartesia,daily,gladia,openai,silero,webrtc]
|
||||
pipecatcloud
|
||||
81
examples/multi-transport-chatbot/server.py
Normal file
81
examples/multi-transport-chatbot/server.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Dict
|
||||
|
||||
import uvicorn
|
||||
from bot import local_webrtc
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, FastAPI
|
||||
from fastapi.responses import FileResponse
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger = logging.getLogger("pc")
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
|
||||
@app.post("/api/offer")
|
||||
async def offer(request: dict, background_tasks: BackgroundTasks):
|
||||
pc_id = request.get("pc_id")
|
||||
|
||||
if pc_id and pc_id in pcs_map:
|
||||
pipecat_connection = pcs_map[pc_id]
|
||||
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
||||
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
|
||||
else:
|
||||
pipecat_connection = SmallWebRTCConnection()
|
||||
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
|
||||
|
||||
@pipecat_connection.event_handler("closed")
|
||||
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
|
||||
pcs_map.pop(webrtc_connection.pc_id, None)
|
||||
|
||||
background_tasks.add_task(local_webrtc, pipecat_connection)
|
||||
|
||||
answer = pipecat_connection.get_answer()
|
||||
# Updating the peer connection inside the map
|
||||
pcs_map[answer["pc_id"]] = pipecat_connection
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def serve_index():
|
||||
return FileResponse("index.html")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
yield # Run app
|
||||
coros = [pc.close() for pc in pcs_map.values()]
|
||||
await asyncio.gather(*coros)
|
||||
pcs_map.clear()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="WebRTC demo")
|
||||
parser.add_argument(
|
||||
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
|
||||
)
|
||||
parser.add_argument("--verbose", "-v", action="count")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
uvicorn.run(app, host=args.host, port=args.port)
|
||||
@@ -169,6 +169,8 @@ class DailyCallbacks(BaseModel):
|
||||
on_error: Called when an error occurs.
|
||||
on_app_message: Called when receiving an app message.
|
||||
on_call_state_updated: Called when call state changes.
|
||||
on_client_connected: Called when a client (participant) connects.
|
||||
on_client_disconnected: Called when a client (participant) disconnects.
|
||||
on_dialin_connected: Called when dial-in is connected.
|
||||
on_dialin_ready: Called when dial-in is ready.
|
||||
on_dialin_stopped: Called when dial-in is stopped.
|
||||
@@ -193,6 +195,8 @@ class DailyCallbacks(BaseModel):
|
||||
on_error: Callable[[str], Awaitable[None]]
|
||||
on_app_message: Callable[[Any, str], Awaitable[None]]
|
||||
on_call_state_updated: Callable[[str], Awaitable[None]]
|
||||
on_client_connected: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_client_disconnected: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_dialin_connected: Callable[[Any], Awaitable[None]]
|
||||
on_dialin_ready: Callable[[str], Awaitable[None]]
|
||||
on_dialin_stopped: Callable[[Any], Awaitable[None]]
|
||||
@@ -1070,6 +1074,8 @@ class DailyTransport(BaseTransport):
|
||||
on_error=self._on_error,
|
||||
on_app_message=self._on_app_message,
|
||||
on_call_state_updated=self._on_call_state_updated,
|
||||
on_client_connected=self._on_client_connected,
|
||||
on_client_disconnected=self._on_client_disconnected,
|
||||
on_dialin_connected=self._on_dialin_connected,
|
||||
on_dialin_ready=self._on_dialin_ready,
|
||||
on_dialin_stopped=self._on_dialin_stopped,
|
||||
@@ -1103,6 +1109,8 @@ class DailyTransport(BaseTransport):
|
||||
self._register_event_handler("on_error")
|
||||
self._register_event_handler("on_app_message")
|
||||
self._register_event_handler("on_call_state_updated")
|
||||
self._register_event_handler("on_client_connected")
|
||||
self._register_event_handler("on_client_disconnected")
|
||||
self._register_event_handler("on_dialin_connected")
|
||||
self._register_event_handler("on_dialin_ready")
|
||||
self._register_event_handler("on_dialin_stopped")
|
||||
@@ -1246,6 +1254,12 @@ class DailyTransport(BaseTransport):
|
||||
async def _on_call_state_updated(self, state: str):
|
||||
await self._call_event_handler("on_call_state_updated", state)
|
||||
|
||||
async def _on_client_connected(self, participant: Any):
|
||||
await self._call_event_handler("on_client_connected", participant)
|
||||
|
||||
async def _on_client_disconnected(self, participant: Any):
|
||||
await self._call_event_handler("on_client_disconnected", participant)
|
||||
|
||||
async def _handle_dialin_ready(self, sip_endpoint: str):
|
||||
if not self._params.dialin_settings:
|
||||
return
|
||||
@@ -1321,11 +1335,15 @@ class DailyTransport(BaseTransport):
|
||||
await self._call_event_handler("on_first_participant_joined", participant)
|
||||
|
||||
await self._call_event_handler("on_participant_joined", participant)
|
||||
# Also call on_client_connected for compatibility with other transports
|
||||
await self._call_event_handler("on_client_connected", participant)
|
||||
|
||||
async def _on_participant_left(self, participant, reason):
|
||||
id = participant["id"]
|
||||
logger.info(f"Participant left {id}")
|
||||
await self._call_event_handler("on_participant_left", participant, reason)
|
||||
# Also call on_client_disconnected for compatibility with other transports
|
||||
await self._call_event_handler("on_client_disconnected", participant)
|
||||
|
||||
async def _on_participant_updated(self, participant):
|
||||
await self._call_event_handler("on_participant_updated", participant)
|
||||
|
||||
Reference in New Issue
Block a user