Renaming files

This commit is contained in:
Mark Backman
2025-07-25 20:23:55 -04:00
parent ae6698429f
commit df211fb5a3
7 changed files with 112 additions and 240 deletions

View File

@@ -21,7 +21,7 @@ from pipecat.services.openai.llm import OpenAILLMService
load_dotenv(override=True)
async def run_bot_logic(transport, handle_sigint: bool = True):
async def run_bot_logic(transport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
@@ -69,16 +69,13 @@ async def run_bot_logic(transport, handle_sigint: bool = True):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(session_args):
"""Main bot entry point compatible with Pipecat Cloud."""
# Get handle_sigint from session_args, default to True for Daily
handle_sigint = getattr(session_args, "handle_sigint", True)
if hasattr(session_args, "room_url"):
# Daily session arguments (cloud or local)
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -108,7 +105,7 @@ async def bot(session_args):
webrtc_connection=session_args.webrtc_connection,
)
await run_bot_logic(transport, handle_sigint)
await run_bot_logic(transport)
if __name__ == "__main__":

View File

@@ -4,43 +4,33 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Cloud-compatible development server that uses subprocess to run bots."""
"""Cloud-compatible development server - simplified without subprocesses."""
import argparse
import asyncio
import os
import subprocess
import sys
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.concurrency import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from loguru import logger
# Import the common transport utility functions
from .transport_utilities import setup_webrtc_routes, setup_websocket_routes
from .transport_utilities import setup_websocket_routes
load_dotenv(override=True)
os.environ["LOCAL_RUN"] = "1"
# Track bot processes
bot_procs = {}
def cleanup():
"""Cleanup function to terminate all bot processes."""
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
def get_bot_module():
"""Get the bot module from the calling script."""
import importlib.util
# Get the main module (the file that was executed)
main_module = sys.modules["__main__"]
@@ -54,49 +44,35 @@ def get_bot_module():
return bot
except ImportError:
raise ImportError(
"Could not find 'bot' function. Make sure your script has a 'bot' function or there's a 'bot.py' file in the current directory."
)
pass
# Look for any .py file in current directory that has a bot function
cwd = os.getcwd()
for filename in os.listdir(cwd):
if filename.endswith(".py") and filename != "server.py":
try:
module_name = filename[:-3] # Remove .py extension
spec = importlib.util.spec_from_file_location(
module_name, os.path.join(cwd, filename)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "bot"):
return module
except Exception:
continue
raise ImportError(
"Could not find 'bot' function. Make sure your bot file has a 'bot' function."
)
async def run_subprocess_bot(transport_type: str, **kwargs):
"""Run a bot via subprocess - used by transport handlers."""
if transport_type == "daily":
import aiohttp
from .daily_runner import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
proc = subprocess.Popen(
[
f"LOCAL_RUN=1 python3 -m pipecat.runner.runner -u {room_url} -t {token} --transport daily"
],
shell=True,
bufsize=1,
cwd=os.getcwd(),
) # Run from current working directory
bot_procs[proc.pid] = (proc, room_url)
return room_url, token
elif transport_type == "livekit":
from .livekit_runner import configure
url, token, room_name = await configure()
proc = subprocess.Popen(
[
f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport livekit --url {url} --token {token} --room {room_name}"
],
shell=True,
bufsize=1,
cwd=os.getcwd(),
) # Run from current working directory
bot_procs[proc.pid] = (proc, url)
return url, token, room_name
elif transport_type == "webrtc":
async def run_bot_directly(transport_type: str, **kwargs):
"""Run a bot directly in the same process - no subprocess needed."""
if transport_type == "webrtc":
if "webrtc_connection" in kwargs:
# Direct connection mode - run bot directly
# Direct WebRTC connection
bot_module = get_bot_module()
class WebRTCSessionArgs:
@@ -108,19 +84,10 @@ async def run_subprocess_bot(transport_type: str, **kwargs):
session_args = WebRTCSessionArgs(kwargs["webrtc_connection"])
await bot_module.bot(session_args)
else:
# Subprocess mode (rarely used for WebRTC)
proc = subprocess.Popen(
[f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport webrtc"],
shell=True,
bufsize=1,
cwd=os.getcwd(),
) # Run from current working directory
bot_procs[proc.pid] = (proc, "webrtc")
elif transport_type in ["twilio", "telnyx", "plivo"]:
if "websocket" in kwargs:
# Direct WebSocket mode - run bot directly
# Direct WebSocket connection
bot_module = get_bot_module()
class WebSocketSessionArgs:
@@ -135,15 +102,6 @@ async def run_subprocess_bot(transport_type: str, **kwargs):
transport_type, kwargs["websocket"], kwargs["call_info"]
)
await bot_module.bot(session_args)
else:
# Subprocess mode (rarely used for telephony)
proc = subprocess.Popen(
[f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport {transport_type}"],
shell=True,
bufsize=1,
cwd=os.getcwd(),
) # Run from current working directory
bot_procs[proc.pid] = (proc, transport_type)
def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = None):
@@ -160,7 +118,7 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N
# Add transport-specific routes
if transport_type == "webrtc":
# Direct WebRTC setup (like the working run.py version)
# Direct WebRTC setup
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
@@ -203,9 +161,9 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
# Run bot directly instead of through run_subprocess_bot
# Run bot directly
background_tasks.add_task(
run_subprocess_bot, "webrtc", webrtc_connection=pipecat_connection
run_bot_directly, "webrtc", webrtc_connection=pipecat_connection
)
answer = pipecat_connection.get_answer()
@@ -231,7 +189,7 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N
app.router.lifespan_context = lifespan
elif transport_type in ["twilio", "telnyx", "plivo"]:
setup_websocket_routes(app, run_subprocess_bot, transport_type, proxy)
setup_websocket_routes(app, run_bot_directly, transport_type, proxy)
# Add general routes
@app.get("/")
@@ -240,18 +198,60 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N
print(f"Starting bot with {transport_type} transport")
if transport_type == "daily":
result = await run_subprocess_bot("daily")
room_url, token = result
return RedirectResponse(room_url)
# Create Daily room and start bot
import aiohttp
from .daily_runner import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background to join the room
bot_module = get_bot_module()
class DailySessionArgs:
def __init__(self, room_url, token):
self.room_url = room_url
self.token = token
self.body = {}
self.handle_sigint = False
session_args = DailySessionArgs(room_url, token)
# Run bot in background task
asyncio.create_task(bot_module.bot(session_args))
# Redirect user to the room
return RedirectResponse(room_url)
elif transport_type == "livekit":
result = await run_subprocess_bot("livekit")
url, token, room_name = result
# Create LiveKit room and start bot
from .livekit_runner import configure
url, token, room_name = await configure()
# Start the bot in the background to join the room
bot_module = get_bot_module()
class LiveKitSessionArgs:
def __init__(self, url, token, room_name):
self.url = url
self.token = token
self.room_name = room_name
self.body = {}
self.handle_sigint = False
session_args = LiveKitSessionArgs(url, token, room_name)
# Run bot in background task
asyncio.create_task(bot_module.bot(session_args))
# Redirect user to the room
return RedirectResponse(url)
elif transport_type == "webrtc":
await run_subprocess_bot("webrtc")
return RedirectResponse("/client/")
else:
await run_subprocess_bot(transport_type)
return {"status": f"Bot started with {transport_type}"}
@app.post("/connect")
@@ -260,11 +260,29 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N
print(f"Starting bot with {transport_type} transport")
if transport_type == "daily":
result = await run_subprocess_bot("daily")
room_url, token = result
return {"transport": "daily", "room_url": room_url, "token": token}
import aiohttp
from .daily_runner import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background
bot_module = get_bot_module()
class DailySessionArgs:
def __init__(self, room_url, token):
self.room_url = room_url
self.token = token
self.body = {}
self.handle_sigint = False
session_args = DailySessionArgs(room_url, token)
asyncio.create_task(bot_module.bot(session_args))
return {"transport": "daily", "room_url": room_url, "token": token}
elif transport_type == "webrtc":
await run_subprocess_bot("webrtc")
return {"transport": "webrtc", "client_url": "/client/"}
else:
# RTVI only supports Daily and WebRTC

View File

@@ -250,7 +250,7 @@ def run_daily(
):
"""Run using Daily.co transport."""
try:
from pipecat.runner.daily_runner import configure
from pipecat.runner.daily import configure
from pipecat.transports.services.daily import DailyParams, DailyTransport
except ImportError as e:
logger.error(f"Daily transport dependencies not installed.")
@@ -274,7 +274,7 @@ def run_livekit(
):
"""Run using LiveKit transport."""
try:
from pipecat.runner.livekit_runner import configure
from pipecat.runner.livekit import configure
from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport
except ImportError as e:
logger.error(f"LiveKit transport dependencies not installed.")

View File

@@ -1,143 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import importlib.util
import os
import sys
def find_and_import_bot():
"""Find and import the bot function from the current working directory."""
cwd = os.getcwd()
# Add current working directory to Python path if not already there
if cwd not in sys.path:
sys.path.insert(0, cwd)
# Try to find bot.py in current directory
bot_file = os.path.join(cwd, "bot.py")
if os.path.exists(bot_file):
spec = importlib.util.spec_from_file_location("bot", bot_file)
bot_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(bot_module)
return bot_module
# Try to import bot module directly
try:
import bot
return bot
except ImportError:
pass
# Look for any .py file in current directory that has a bot function
for filename in os.listdir(cwd):
if filename.endswith(".py") and filename != "runner.py":
try:
module_name = filename[:-3] # Remove .py extension
spec = importlib.util.spec_from_file_location(
module_name, os.path.join(cwd, filename)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "bot"):
return module
except Exception:
continue
raise ImportError(
"Could not find 'bot' function. Make sure your bot file has a 'bot' function."
)
def main():
"""Parse args and launch the bot with specified transport."""
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("-u", "--url", type=str, required=False, help="Daily room URL")
parser.add_argument("-t", "--token", type=str, required=False, help="Daily room token")
parser.add_argument(
"--transport",
type=str,
choices=["daily", "livekit", "webrtc"],
default="daily",
help="Transport type",
)
parser.add_argument("--room", type=str, required=False, help="LiveKit room name")
args, unknown = parser.parse_known_args()
# Find and import the bot function
try:
bot_module = find_and_import_bot()
bot_function = bot_module.bot
except ImportError as e:
print(f"Error: {e}")
sys.exit(1)
if args.transport == "daily":
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
token = args.token or os.getenv("DAILY_SAMPLE_ROOM_TOKEN")
if not url or not token:
raise Exception("Daily room URL and token are required.")
# Create Daily session arguments
try:
from pipecatcloud.agent import DailySessionArguments
session_args = DailySessionArguments(
room_url=url,
token=token,
body={},
session_id=None,
)
except ImportError:
# Fallback for local development
class LocalDailySessionArgs:
def __init__(self, room_url, token, body=None):
self.room_url = room_url
self.token = token
self.body = body or {}
session_args = LocalDailySessionArgs(url, token)
elif args.transport == "livekit":
url = args.url or os.getenv("LIVEKIT_URL")
token = args.token or os.getenv("LIVEKIT_TOKEN")
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
if not url or not token or not room_name:
raise Exception("LiveKit URL, token, and room name are required.")
class LiveKitSessionArgs:
def __init__(self, url, token, room_name):
self.url = url
self.token = token
self.room_name = room_name
self.body = {}
session_args = LiveKitSessionArgs(url, token, room_name)
elif args.transport == "webrtc":
# For WebRTC subprocess mode (not typically used)
class WebRTCSessionArgs:
def __init__(self):
self.transport_type = "webrtc"
self.body = {}
session_args = WebRTCSessionArgs()
else:
raise Exception(f"Unsupported transport: {args.transport}")
asyncio.run(bot_function(session_args))
if __name__ == "__main__":
main()