Files
pipecat/examples/deployment/flyio-example/bot_runner.py
2025-01-12 11:34:00 -08:00

210 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomParams,
DailyRoomProperties,
)
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
"DAILY_API_KEY",
"OPENAI_API_KEY",
"ELEVENLABS_API_KEY",
"ELEVENLABS_VOICE_ID",
"FLY_API_KEY",
"FLY_APP_NAME",
]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {"Authorization": f"Bearer {FLY_API_KEY}", "Content-Type": "application/json"}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------- Main ----------------- #
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {"cmd": cmd},
"restart": {"policy": "no"},
"guest": {"cpu_kind": "shared", "cpus": 1, "memory_mb": 1024},
},
}
# Spawn a new machine instance
async with session.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data["id"]
async with session.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS,
) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
print(f"Machine joined room: {room_url}")
@app.post("/")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(properties=DailyRoomProperties())
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
if run_as_process:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse(
{
"room_url": room.url,
"token": user_token,
}
)
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument(
"--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address"
)
parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument(
"--reload", action="store_true", default=False, help="Reload code on change"
)
config = parser.parse_args()
try:
import uvicorn
uvicorn.run("bot_runner:app", host=config.host, port=config.port, reload=config.reload)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")