import os import argparse import subprocess import requests from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from dotenv import load_dotenv load_dotenv(override=True) # ------------ Configuration ------------ # MAX_SESSION_TIME = 5 * 60 # 5 minutes REQUIRED_ENV_VARS = [ 'DAILY_API_KEY', 'OPENAI_API_KEY', 'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID', 'FLY_API_KEY', 'FLY_APP_NAME',] FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1") FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example") FLY_API_KEY = os.getenv("FLY_API_KEY", "") FLY_HEADERS = { 'Authorization': f"Bearer {FLY_API_KEY}", 'Content-Type': 'application/json' } daily_rest_helper = DailyRESTHelper( os.getenv("DAILY_API_KEY", ""), os.getenv("DAILY_API_URL", 'https://api.daily.co/v1')) # ----------------- API ----------------- # app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) # ----------------- Main ----------------- # def spawn_fly_machine(room_url: str, token: str): # Use the same image as the bot runner res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) if res.status_code != 200: raise Exception(f"Unable to get machine info from Fly: {res.text}") image = res.json()[0]['config']['image'] # Machine configuration cmd = f"python3 bot.py -u {room_url} -t {token}" cmd = cmd.split() worker_props = { "config": { "image": image, "auto_destroy": True, "init": { "cmd": cmd }, "restart": { "policy": "no" }, "guest": { "cpu_kind": "shared", "cpus": 1, "memory_mb": 1024 } }, } # Spawn a new machine instance res = requests.post( f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) if res.status_code != 200: raise Exception(f"Problem starting a bot worker: {res.text}") # Wait for the machine to enter the started state vm_id = res.json()['id'] res = requests.get( f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) if res.status_code != 200: raise Exception(f"Bot was unable to enter started state: {res.text}") print(f"Machine joined room: {room_url}") @app.post("/start_bot") async def start_bot(request: Request) -> JSONResponse: try: data = await request.json() # Is this a webhook creation request? if "test" in data: return JSONResponse({"test": True}) except Exception as e: pass # Use specified room URL, or create a new one if not specified room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "") if not room_url: params = DailyRoomParams( properties=DailyRoomProperties() ) try: room: DailyRoomObject = daily_rest_helper.create_room(params=params) except Exception as e: raise HTTPException( status_code=500, detail=f"Unable to provision room {e}") else: # Check passed room URL exists, we should assume that it already has a sip set up try: room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url) except Exception: raise HTTPException( status_code=500, detail=f"Room not found: {room_url}") # Give the agent a token to join the session token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) if not room or not token: raise HTTPException( status_code=500, detail=f"Failed to get token for room: {room_url}") # Launch a new fly.io machine, or run as a shell process (not recommended) run_as_process = os.getenv("RUN_AS_PROCESS", False) if run_as_process: try: subprocess.Popen( [f"python3 -m bot -u {room.url} -t {token}"], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to start subprocess: {e}") else: try: spawn_fly_machine(room.url, token) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to spawn VM: {e}") # Grab a token for the user to join with user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) return JSONResponse({ "room_url": room.url, "token": user_token, }) if __name__ == "__main__": # Check environment variables for env_var in REQUIRED_ENV_VARS: if env_var not in os.environ: raise Exception(f"Missing environment variable: {env_var}.") parser = argparse.ArgumentParser(description="Pipecat Bot Runner") parser.add_argument("--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address") parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number") parser.add_argument("--reload", action="store_true", default=False, help="Reload code on change") config = parser.parse_args() try: import uvicorn uvicorn.run( "bot_runner:app", host=config.host, port=config.port, reload=config.reload ) except KeyboardInterrupt: print("Pipecat runner shutting down...")