From df211fb5a3637bf172cf2d3641f048fae55c8f1a Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 25 Jul 2025 20:23:55 -0400 Subject: [PATCH] Renaming files --- examples/quickstart/pcc-simple-bot.py | 9 +- src/pipecat/runner/{server.py => cloud.py} | 196 ++++++++++-------- .../runner/{daily_runner.py => daily.py} | 0 .../runner/{livekit_runner.py => livekit.py} | 0 src/pipecat/runner/{run.py => local.py} | 4 +- src/pipecat/runner/runner.py | 143 ------------- .../{transport_utilities.py => utils.py} | 0 7 files changed, 112 insertions(+), 240 deletions(-) rename src/pipecat/runner/{server.py => cloud.py} (64%) rename src/pipecat/runner/{daily_runner.py => daily.py} (100%) rename src/pipecat/runner/{livekit_runner.py => livekit.py} (100%) rename src/pipecat/runner/{run.py => local.py} (99%) delete mode 100644 src/pipecat/runner/runner.py rename src/pipecat/runner/{transport_utilities.py => utils.py} (100%) diff --git a/examples/quickstart/pcc-simple-bot.py b/examples/quickstart/pcc-simple-bot.py index 3fc9f7a25..c714ae416 100644 --- a/examples/quickstart/pcc-simple-bot.py +++ b/examples/quickstart/pcc-simple-bot.py @@ -21,7 +21,7 @@ from pipecat.services.openai.llm import OpenAILLMService load_dotenv(override=True) -async def run_bot_logic(transport, handle_sigint: bool = True): +async def run_bot_logic(transport): """Main bot logic that works with any transport.""" logger.info(f"Starting bot") @@ -69,16 +69,13 @@ async def run_bot_logic(transport, handle_sigint: bool = True): logger.info("Client disconnected") await task.cancel() - runner = PipelineRunner(handle_sigint=handle_sigint) + runner = PipelineRunner(handle_sigint=False) await runner.run(task) async def bot(session_args): """Main bot entry point compatible with Pipecat Cloud.""" - # Get handle_sigint from session_args, default to True for Daily - handle_sigint = getattr(session_args, "handle_sigint", True) - if hasattr(session_args, "room_url"): # Daily session arguments (cloud or local) from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -108,7 +105,7 @@ async def bot(session_args): webrtc_connection=session_args.webrtc_connection, ) - await run_bot_logic(transport, handle_sigint) + await run_bot_logic(transport) if __name__ == "__main__": diff --git a/src/pipecat/runner/server.py b/src/pipecat/runner/cloud.py similarity index 64% rename from src/pipecat/runner/server.py rename to src/pipecat/runner/cloud.py index de2736100..55dbe1916 100644 --- a/src/pipecat/runner/server.py +++ b/src/pipecat/runner/cloud.py @@ -4,43 +4,33 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Cloud-compatible development server that uses subprocess to run bots.""" +"""Cloud-compatible development server - simplified without subprocesses.""" import argparse import asyncio import os -import subprocess import sys +from contextlib import asynccontextmanager from typing import Dict import uvicorn from dotenv import load_dotenv from fastapi import BackgroundTasks, FastAPI -from fastapi.concurrency import asynccontextmanager from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse from loguru import logger # Import the common transport utility functions -from .transport_utilities import setup_webrtc_routes, setup_websocket_routes +from .transport_utilities import setup_websocket_routes load_dotenv(override=True) os.environ["LOCAL_RUN"] = "1" -# Track bot processes -bot_procs = {} - - -def cleanup(): - """Cleanup function to terminate all bot processes.""" - for entry in bot_procs.values(): - proc = entry[0] - proc.terminate() - proc.wait() - def get_bot_module(): """Get the bot module from the calling script.""" + import importlib.util + # Get the main module (the file that was executed) main_module = sys.modules["__main__"] @@ -54,49 +44,35 @@ def get_bot_module(): return bot except ImportError: - raise ImportError( - "Could not find 'bot' function. Make sure your script has a 'bot' function or there's a 'bot.py' file in the current directory." - ) + pass + + # Look for any .py file in current directory that has a bot function + cwd = os.getcwd() + for filename in os.listdir(cwd): + if filename.endswith(".py") and filename != "server.py": + try: + module_name = filename[:-3] # Remove .py extension + spec = importlib.util.spec_from_file_location( + module_name, os.path.join(cwd, filename) + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + if hasattr(module, "bot"): + return module + except Exception: + continue + + raise ImportError( + "Could not find 'bot' function. Make sure your bot file has a 'bot' function." + ) -async def run_subprocess_bot(transport_type: str, **kwargs): - """Run a bot via subprocess - used by transport handlers.""" - if transport_type == "daily": - import aiohttp - - from .daily_runner import configure - - async with aiohttp.ClientSession() as session: - room_url, token = await configure(session) - proc = subprocess.Popen( - [ - f"LOCAL_RUN=1 python3 -m pipecat.runner.runner -u {room_url} -t {token} --transport daily" - ], - shell=True, - bufsize=1, - cwd=os.getcwd(), - ) # Run from current working directory - bot_procs[proc.pid] = (proc, room_url) - return room_url, token - - elif transport_type == "livekit": - from .livekit_runner import configure - - url, token, room_name = await configure() - proc = subprocess.Popen( - [ - f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport livekit --url {url} --token {token} --room {room_name}" - ], - shell=True, - bufsize=1, - cwd=os.getcwd(), - ) # Run from current working directory - bot_procs[proc.pid] = (proc, url) - return url, token, room_name - - elif transport_type == "webrtc": +async def run_bot_directly(transport_type: str, **kwargs): + """Run a bot directly in the same process - no subprocess needed.""" + if transport_type == "webrtc": if "webrtc_connection" in kwargs: - # Direct connection mode - run bot directly + # Direct WebRTC connection bot_module = get_bot_module() class WebRTCSessionArgs: @@ -108,19 +84,10 @@ async def run_subprocess_bot(transport_type: str, **kwargs): session_args = WebRTCSessionArgs(kwargs["webrtc_connection"]) await bot_module.bot(session_args) - else: - # Subprocess mode (rarely used for WebRTC) - proc = subprocess.Popen( - [f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport webrtc"], - shell=True, - bufsize=1, - cwd=os.getcwd(), - ) # Run from current working directory - bot_procs[proc.pid] = (proc, "webrtc") elif transport_type in ["twilio", "telnyx", "plivo"]: if "websocket" in kwargs: - # Direct WebSocket mode - run bot directly + # Direct WebSocket connection bot_module = get_bot_module() class WebSocketSessionArgs: @@ -135,15 +102,6 @@ async def run_subprocess_bot(transport_type: str, **kwargs): transport_type, kwargs["websocket"], kwargs["call_info"] ) await bot_module.bot(session_args) - else: - # Subprocess mode (rarely used for telephony) - proc = subprocess.Popen( - [f"LOCAL_RUN=1 python3 -m pipecat.runner.runner --transport {transport_type}"], - shell=True, - bufsize=1, - cwd=os.getcwd(), - ) # Run from current working directory - bot_procs[proc.pid] = (proc, transport_type) def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = None): @@ -160,7 +118,7 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N # Add transport-specific routes if transport_type == "webrtc": - # Direct WebRTC setup (like the working run.py version) + # Direct WebRTC setup try: from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI @@ -203,9 +161,9 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}") pcs_map.pop(webrtc_connection.pc_id, None) - # Run bot directly instead of through run_subprocess_bot + # Run bot directly background_tasks.add_task( - run_subprocess_bot, "webrtc", webrtc_connection=pipecat_connection + run_bot_directly, "webrtc", webrtc_connection=pipecat_connection ) answer = pipecat_connection.get_answer() @@ -231,7 +189,7 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N app.router.lifespan_context = lifespan elif transport_type in ["twilio", "telnyx", "plivo"]: - setup_websocket_routes(app, run_subprocess_bot, transport_type, proxy) + setup_websocket_routes(app, run_bot_directly, transport_type, proxy) # Add general routes @app.get("/") @@ -240,18 +198,60 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N print(f"Starting bot with {transport_type} transport") if transport_type == "daily": - result = await run_subprocess_bot("daily") - room_url, token = result - return RedirectResponse(room_url) + # Create Daily room and start bot + import aiohttp + + from .daily_runner import configure + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + + # Start the bot in the background to join the room + bot_module = get_bot_module() + + class DailySessionArgs: + def __init__(self, room_url, token): + self.room_url = room_url + self.token = token + self.body = {} + self.handle_sigint = False + + session_args = DailySessionArgs(room_url, token) + + # Run bot in background task + asyncio.create_task(bot_module.bot(session_args)) + + # Redirect user to the room + return RedirectResponse(room_url) + elif transport_type == "livekit": - result = await run_subprocess_bot("livekit") - url, token, room_name = result + # Create LiveKit room and start bot + from .livekit_runner import configure + + url, token, room_name = await configure() + + # Start the bot in the background to join the room + bot_module = get_bot_module() + + class LiveKitSessionArgs: + def __init__(self, url, token, room_name): + self.url = url + self.token = token + self.room_name = room_name + self.body = {} + self.handle_sigint = False + + session_args = LiveKitSessionArgs(url, token, room_name) + + # Run bot in background task + asyncio.create_task(bot_module.bot(session_args)) + + # Redirect user to the room return RedirectResponse(url) + elif transport_type == "webrtc": - await run_subprocess_bot("webrtc") return RedirectResponse("/client/") else: - await run_subprocess_bot(transport_type) return {"status": f"Bot started with {transport_type}"} @app.post("/connect") @@ -260,11 +260,29 @@ def create_server_app(transport_type: str, host: str = "0.0.0.0", proxy: str = N print(f"Starting bot with {transport_type} transport") if transport_type == "daily": - result = await run_subprocess_bot("daily") - room_url, token = result - return {"transport": "daily", "room_url": room_url, "token": token} + import aiohttp + + from .daily_runner import configure + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + + # Start the bot in the background + bot_module = get_bot_module() + + class DailySessionArgs: + def __init__(self, room_url, token): + self.room_url = room_url + self.token = token + self.body = {} + self.handle_sigint = False + + session_args = DailySessionArgs(room_url, token) + asyncio.create_task(bot_module.bot(session_args)) + + return {"transport": "daily", "room_url": room_url, "token": token} + elif transport_type == "webrtc": - await run_subprocess_bot("webrtc") return {"transport": "webrtc", "client_url": "/client/"} else: # RTVI only supports Daily and WebRTC diff --git a/src/pipecat/runner/daily_runner.py b/src/pipecat/runner/daily.py similarity index 100% rename from src/pipecat/runner/daily_runner.py rename to src/pipecat/runner/daily.py diff --git a/src/pipecat/runner/livekit_runner.py b/src/pipecat/runner/livekit.py similarity index 100% rename from src/pipecat/runner/livekit_runner.py rename to src/pipecat/runner/livekit.py diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/local.py similarity index 99% rename from src/pipecat/runner/run.py rename to src/pipecat/runner/local.py index 80e706e2e..94df439a4 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/local.py @@ -250,7 +250,7 @@ def run_daily( ): """Run using Daily.co transport.""" try: - from pipecat.runner.daily_runner import configure + from pipecat.runner.daily import configure from pipecat.transports.services.daily import DailyParams, DailyTransport except ImportError as e: logger.error(f"Daily transport dependencies not installed.") @@ -274,7 +274,7 @@ def run_livekit( ): """Run using LiveKit transport.""" try: - from pipecat.runner.livekit_runner import configure + from pipecat.runner.livekit import configure from pipecat.transports.services.livekit import LiveKitParams, LiveKitTransport except ImportError as e: logger.error(f"LiveKit transport dependencies not installed.") diff --git a/src/pipecat/runner/runner.py b/src/pipecat/runner/runner.py deleted file mode 100644 index fdfc74a57..000000000 --- a/src/pipecat/runner/runner.py +++ /dev/null @@ -1,143 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import argparse -import asyncio -import importlib.util -import os -import sys - - -def find_and_import_bot(): - """Find and import the bot function from the current working directory.""" - cwd = os.getcwd() - - # Add current working directory to Python path if not already there - if cwd not in sys.path: - sys.path.insert(0, cwd) - - # Try to find bot.py in current directory - bot_file = os.path.join(cwd, "bot.py") - if os.path.exists(bot_file): - spec = importlib.util.spec_from_file_location("bot", bot_file) - bot_module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(bot_module) - return bot_module - - # Try to import bot module directly - try: - import bot - - return bot - except ImportError: - pass - - # Look for any .py file in current directory that has a bot function - for filename in os.listdir(cwd): - if filename.endswith(".py") and filename != "runner.py": - try: - module_name = filename[:-3] # Remove .py extension - spec = importlib.util.spec_from_file_location( - module_name, os.path.join(cwd, filename) - ) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - if hasattr(module, "bot"): - return module - except Exception: - continue - - raise ImportError( - "Could not find 'bot' function. Make sure your bot file has a 'bot' function." - ) - - -def main(): - """Parse args and launch the bot with specified transport.""" - parser = argparse.ArgumentParser(description="Pipecat Bot Runner") - parser.add_argument("-u", "--url", type=str, required=False, help="Daily room URL") - parser.add_argument("-t", "--token", type=str, required=False, help="Daily room token") - parser.add_argument( - "--transport", - type=str, - choices=["daily", "livekit", "webrtc"], - default="daily", - help="Transport type", - ) - parser.add_argument("--room", type=str, required=False, help="LiveKit room name") - - args, unknown = parser.parse_known_args() - - # Find and import the bot function - try: - bot_module = find_and_import_bot() - bot_function = bot_module.bot - except ImportError as e: - print(f"Error: {e}") - sys.exit(1) - - if args.transport == "daily": - url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") - token = args.token or os.getenv("DAILY_SAMPLE_ROOM_TOKEN") - - if not url or not token: - raise Exception("Daily room URL and token are required.") - - # Create Daily session arguments - try: - from pipecatcloud.agent import DailySessionArguments - - session_args = DailySessionArguments( - room_url=url, - token=token, - body={}, - session_id=None, - ) - except ImportError: - # Fallback for local development - class LocalDailySessionArgs: - def __init__(self, room_url, token, body=None): - self.room_url = room_url - self.token = token - self.body = body or {} - - session_args = LocalDailySessionArgs(url, token) - - elif args.transport == "livekit": - url = args.url or os.getenv("LIVEKIT_URL") - token = args.token or os.getenv("LIVEKIT_TOKEN") - room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME") - - if not url or not token or not room_name: - raise Exception("LiveKit URL, token, and room name are required.") - - class LiveKitSessionArgs: - def __init__(self, url, token, room_name): - self.url = url - self.token = token - self.room_name = room_name - self.body = {} - - session_args = LiveKitSessionArgs(url, token, room_name) - - elif args.transport == "webrtc": - # For WebRTC subprocess mode (not typically used) - class WebRTCSessionArgs: - def __init__(self): - self.transport_type = "webrtc" - self.body = {} - - session_args = WebRTCSessionArgs() - - else: - raise Exception(f"Unsupported transport: {args.transport}") - - asyncio.run(bot_function(session_args)) - - -if __name__ == "__main__": - main() diff --git a/src/pipecat/runner/transport_utilities.py b/src/pipecat/runner/utils.py similarity index 100% rename from src/pipecat/runner/transport_utilities.py rename to src/pipecat/runner/utils.py