diff --git a/CHANGELOG.md b/CHANGELOG.md index 945503530..ee756ea60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added the ability to switch voices to `RimeTTSService`. +- Added unified development runner for building voice AI bots across multiple transports + + - `pipecat.runner.run` – FastAPI-based development server with automatic bot discovery + - `pipecat.runner.types` – Runner session argument types (`DailyRunnerArguments`, `SmallWebRTCRunnerArguments`, `WebSocketRunnerArguments`) + - `pipecat.runner.utils.create_transport()` – Factory function for creating transports from session arguments + - `pipecat.runner.daily` and `pipecat.runner.livekit` – Configuration utilities for Daily and LiveKit setups + - Support for all transport types: Daily, WebRTC, Twilio, Telnyx, Plivo + - Automatic telephony provider detection and serializer configuration + - ESP32 WebRTC compatibility with SDP munging + - Environment detection (`ENV=local`) for conditional features + - Added Async.ai TTS integration (https://async.ai/) - `AsyncAITTSService` – WebSocket-based streaming TTS with interruption support diff --git a/docs/api/build-docs.sh b/docs/api/build-docs.sh index 46d025404..5260a3352 100755 --- a/docs/api/build-docs.sh +++ b/docs/api/build-docs.sh @@ -1,5 +1,12 @@ #!/bin/bash +# Check if sphinx-build is installed +if ! command -v sphinx-build &> /dev/null; then + echo "Error: sphinx-build is not installed or not in PATH" >&2 + echo "Please install Sphinx using: pip install -r requirements.txt" >&2 + exit 1 +fi + # Clean previous build rm -rf _build diff --git a/docs/api/conf.py b/docs/api/conf.py index aca7fd923..32d801b40 100644 --- a/docs/api/conf.py +++ b/docs/api/conf.py @@ -202,6 +202,7 @@ def import_core_modules(): "pipecat.clocks", "pipecat.metrics", "pipecat.observers", + "pipecat.runner", "pipecat.serializers", "pipecat.sync", "pipecat.transcriptions", diff --git a/docs/api/index.rst b/docs/api/index.rst index 344cf3ec6..d60a027a2 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -26,6 +26,7 @@ Quick Links Observers Pipeline Processors + Runner Serializers Services Sync diff --git a/docs/api/requirements.txt b/docs/api/requirements.txt index 95e8bc451..4807b07d6 100644 --- a/docs/api/requirements.txt +++ b/docs/api/requirements.txt @@ -44,6 +44,7 @@ pipecat-ai[openai] pipecat-ai[qwen] pipecat-ai[remote-smart-turn] # pipecat-ai[riva] # Mocked +pipecat-ai[runner] pipecat-ai[sambanova] pipecat-ai[silero] pipecat-ai[simli] diff --git a/examples/runner-examples/01-all-transport-bot.py b/examples/runner-examples/01-all-transport-bot.py new file mode 100644 index 000000000..982abc58d --- /dev/null +++ b/examples/runner-examples/01-all-transport-bot.py @@ -0,0 +1,218 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Pipecat Cloud-compatible bot example. + +Transports are: + +- Daily +- SmallWebRTC +- Twilio +- Telnyx +- Plivo +""" + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor +from pipecat.runner.types import ( + DailyRunnerArguments, + RunnerArguments, + SmallWebRTCRunnerArguments, + WebSocketRunnerArguments, +) +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport + +load_dotenv(override=True) + + +async def run_bot(transport: BaseTransport): + """Main bot logic that works with any transport.""" + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + + transport = None + + if isinstance(runner_args, DailyRunnerArguments): + from pipecat.transports.services.daily import DailyParams, DailyTransport + + if os.environ.get("ENV") != "local": + from pipecat.audio.filters.krisp_filter import KrispFilter + + krisp_filter = KrispFilter() + else: + krisp_filter = None + + transport = DailyTransport( + runner_args.room_url, + runner_args.token, + "Pipecat Bot", + params=DailyParams( + audio_in_enabled=True, + audio_in_filter=krisp_filter, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + elif isinstance(runner_args, SmallWebRTCRunnerArguments): + from pipecat.transports.base_transport import TransportParams + from pipecat.transports.network.small_webrtc import SmallWebRTCTransport + + transport = SmallWebRTCTransport( + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + webrtc_connection=runner_args.webrtc_connection, + ) + + elif isinstance(runner_args, WebSocketRunnerArguments): + # Use the utility to parse WebSocket data + from pipecat.runner.utils import parse_telephony_websocket + + transport_type, call_data = await parse_telephony_websocket(runner_args.websocket) + logger.info(f"Auto-detected transport: {transport_type}") + + # Create transport based on detected type + if transport_type == "twilio": + from pipecat.serializers.twilio import TwilioFrameSerializer + + serializer = TwilioFrameSerializer( + stream_sid=call_data["stream_id"], + call_sid=call_data["call_id"], + account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""), + auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""), + ) + + elif transport_type == "telnyx": + from pipecat.serializers.telnyx import TelnyxFrameSerializer + + serializer = TelnyxFrameSerializer( + stream_id=call_data["stream_id"], + call_control_id=call_data["call_control_id"], + outbound_encoding=call_data["outbound_encoding"], + inbound_encoding="PCMU", # Set manually + api_key=os.getenv("TELNYX_API_KEY", ""), + ) + + elif transport_type == "plivo": + from pipecat.serializers.plivo import PlivoFrameSerializer + + serializer = PlivoFrameSerializer( + stream_id=call_data["stream_id"], + call_id=call_data["call_id"], + auth_id=os.getenv("PLIVO_AUTH_ID", ""), + auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""), + ) + + else: + # Generic fallback + serializer = None + + # Create the transport + from pipecat.transports.network.fastapi_websocket import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, + ) + + transport = FastAPIWebsocketTransport( + websocket=runner_args.websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + add_wav_header=False, + vad_analyzer=SileroVADAnalyzer(), + serializer=serializer, + ), + ) + else: + logger.error(f"Unsupported runner arguments type: {type(runner_args)}") + return + + if transport is None: + logger.error("Failed to create transport") + return + + await run_bot(transport) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/runner-examples/01-all-transport-factory-bot.py b/examples/runner-examples/01-all-transport-factory-bot.py new file mode 100644 index 000000000..8ec1b7f27 --- /dev/null +++ b/examples/runner-examples/01-all-transport-factory-bot.py @@ -0,0 +1,144 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Pipecat Cloud-compatible bot example. + +Transports are: + +- Daily +- SmallWebRTC +- Twilio +- Telnyx +- Plivo +""" + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + +# Define transport configurations using factory functions +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), + "telnyx": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), + "plivo": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), +} + + +async def run_bot(transport: BaseTransport): + """Main bot logic that works with any transport.""" + logger.info("Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/runner-examples/02-two-transport-bot.py b/examples/runner-examples/02-two-transport-bot.py new file mode 100644 index 000000000..31ec577e1 --- /dev/null +++ b/examples/runner-examples/02-two-transport-bot.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Pipecat Cloud-compatible bot example. + +Transports are Daily or SmallWebRTC. + +Run it with: + +- WebRTC transport:: + + python 02-two-transport-bot.py + +- Daily transport:: + + python 02-two-transport-bot.py --transport daily +""" + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor +from pipecat.runner.types import DailyRunnerArguments, RunnerArguments, SmallWebRTCRunnerArguments +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport + +load_dotenv(override=True) + + +async def run_bot(transport: BaseTransport): + """Main bot logic that works with any transport.""" + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + + transport = None + + if isinstance(runner_args, DailyRunnerArguments): + from pipecat.transports.services.daily import DailyParams, DailyTransport + + if os.environ.get("ENV") != "local": + from pipecat.audio.filters.krisp_filter import KrispFilter + + krisp_filter = KrispFilter() + else: + krisp_filter = None + + transport = DailyTransport( + runner_args.room_url, + runner_args.token, + "Pipecat Bot", + params=DailyParams( + audio_in_enabled=True, + audio_in_filter=krisp_filter, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + ) + + elif isinstance(runner_args, SmallWebRTCRunnerArguments): + from pipecat.transports.base_transport import TransportParams + from pipecat.transports.network.small_webrtc import SmallWebRTCTransport + + transport = SmallWebRTCTransport( + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + webrtc_connection=runner_args.webrtc_connection, + ) + else: + logger.error(f"Unsupported runner arguments type: {type(runner_args)}") + return + + if transport is None: + logger.error("Failed to create transport") + return + + await run_bot(transport) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/runner-examples/03-single-transport-bot.py b/examples/runner-examples/03-single-transport-bot.py new file mode 100644 index 000000000..bfa2ca3d3 --- /dev/null +++ b/examples/runner-examples/03-single-transport-bot.py @@ -0,0 +1,117 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Pipecat development runner example. + +This example has a single transport—SmallWebRTCTransport. + +Run it with:: + + python 03-single-transport-bot.py +""" + +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor +from pipecat.runner.types import RunnerArguments +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport + +load_dotenv(override=True) + + +async def run_bot(transport: BaseTransport): + """Main bot logic that works with any transport.""" + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + messages = [ + { + "role": "system", + "content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + rtvi = RTVIProcessor(config=RTVIConfig(config=[])) + + pipeline = Pipeline( + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + observers=[RTVIObserver(rtvi)], + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."}) + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + + transport = SmallWebRTCTransport( + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + webrtc_connection=runner_args.webrtc_connection, + ) + + await run_bot(transport) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/runner-examples/Dockerfile b/examples/runner-examples/Dockerfile new file mode 100644 index 000000000..01ca0b133 --- /dev/null +++ b/examples/runner-examples/Dockerfile @@ -0,0 +1,7 @@ +FROM dailyco/pipecat-base:latest + +COPY ./requirements.txt requirements.txt + +RUN pip install --no-cache-dir --upgrade -r requirements.txt + +COPY ./02-two-transport-bot.py bot.py diff --git a/examples/runner-examples/build.sh b/examples/runner-examples/build.sh new file mode 100755 index 000000000..d0adb46fb --- /dev/null +++ b/examples/runner-examples/build.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -e + +VERSION="0.1" +DOCKER_USERNAME="your_docker_username" +AGENT_NAME="cloud-simple-bot" + +# Build the Docker image with the correct context +echo "Building Docker image..." +docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" . + +# Push the Docker images +echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..." +docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" + +echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..." +docker push "$DOCKER_USERNAME/$AGENT_NAME:latest" + +echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest" \ No newline at end of file diff --git a/examples/runner-examples/env.example b/examples/runner-examples/env.example new file mode 100644 index 000000000..b3e882f5d --- /dev/null +++ b/examples/runner-examples/env.example @@ -0,0 +1,3 @@ +DEEPGRAM_API_KEY=your_deepgram_api_key +OPENAI_API_KEY=your_openai_api_key +CARTESIA_API_KEY=your_cartesia_api_key \ No newline at end of file diff --git a/examples/runner-examples/pcc-deploy.toml b/examples/runner-examples/pcc-deploy.toml new file mode 100644 index 000000000..1fef22e05 --- /dev/null +++ b/examples/runner-examples/pcc-deploy.toml @@ -0,0 +1,8 @@ +agent_name = "cloud-simple-bot" +image = "your_dockerhub_username/cloud-simple-bot:0.1" +image_credentials = "dockerhub-access" +secret_set = "cloud-simple-bot-secrets" +enable_krisp = true + +[scaling] + min_agents = 0 diff --git a/examples/runner-examples/requirements.txt b/examples/runner-examples/requirements.txt new file mode 100644 index 000000000..a38c9612a --- /dev/null +++ b/examples/runner-examples/requirements.txt @@ -0,0 +1 @@ +pipecat-ai[openai,daily,deepgram,cartesia,silero,webrtc,websocket,runner] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 5867284db..a5275881e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ playht = [ "pyht>=0.1.6", "websockets>=13.1,<15.0" ] qwen = [] rime = [ "websockets>=13.1,<15.0" ] riva = [ "nvidia-riva-client~=2.21.1" ] +runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"] sambanova = [] sentry = [ "sentry-sdk~=2.23.1" ] local-smart-turn = [ "coremltools>=8.0", "transformers", "torch~=2.5.0", "torchaudio~=2.5.0" ] diff --git a/src/pipecat/runner/__init__.py b/src/pipecat/runner/__init__.py new file mode 100644 index 000000000..1bd2e75e9 --- /dev/null +++ b/src/pipecat/runner/__init__.py @@ -0,0 +1 @@ +"""Pipecat runner package for local and cloud bot execution.""" diff --git a/src/pipecat/runner/daily.py b/src/pipecat/runner/daily.py new file mode 100644 index 000000000..3fae393fe --- /dev/null +++ b/src/pipecat/runner/daily.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Daily room and token configuration utilities. + +This module provides helper functions for creating and configuring Daily rooms +and authentication tokens. It handles both command-line argument parsing and +environment variable configuration. + +The module supports creating temporary rooms for development or using existing +rooms specified via arguments or environment variables. + +Required environment variables: + +- DAILY_API_KEY - Daily API key for room/token creation +- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use +- DAILY_SAMPLE_ROOM_TOKEN (optional) - Existing token to use + +Example:: + + import aiohttp + from pipecat.runner.daily import configure + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + # Use room_url and token with DailyTransport +""" + +import argparse +import os +from typing import Optional + +import aiohttp + +from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper + + +async def configure(aiohttp_session: aiohttp.ClientSession): + """Configure Daily room URL and token from arguments or environment. + + Args: + aiohttp_session: HTTP session for making API requests. + + Returns: + Tuple containing the room URL and authentication token. + + Raises: + Exception: If room URL or API key are not provided. + """ + (url, token, _) = await configure_with_args(aiohttp_session) + return (url, token) + + +async def configure_with_args( + aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None +): + """Configure Daily room with command-line argument parsing. + + Args: + aiohttp_session: HTTP session for making API requests. + parser: Optional argument parser. If None, creates a default one. + + Returns: + Tuple containing room URL, authentication token, and parsed arguments. + + Raises: + Exception: If room URL or API key are not provided via arguments or environment. + """ + if not parser: + parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample") + parser.add_argument( + "-u", "--url", type=str, required=False, help="URL of the Daily room to join" + ) + parser.add_argument( + "-k", + "--apikey", + type=str, + required=False, + help="Daily API Key (needed to create an owner token for the room)", + ) + + args, unknown = parser.parse_known_args() + + url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL") + key = args.apikey or os.getenv("DAILY_API_KEY") + + if not url: + raise Exception( + "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL." + ) + + if not key: + raise Exception( + "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers." + ) + + daily_rest_helper = DailyRESTHelper( + daily_api_key=key, + daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"), + aiohttp_session=aiohttp_session, + ) + + # Create a meeting token for the given room with an expiration 2 hours in + # the future. + expiry_time: float = 2 * 60 * 60 + + token = await daily_rest_helper.get_token(url, expiry_time) + + return (url, token, args) diff --git a/src/pipecat/runner/livekit.py b/src/pipecat/runner/livekit.py new file mode 100644 index 000000000..f7639cd60 --- /dev/null +++ b/src/pipecat/runner/livekit.py @@ -0,0 +1,148 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""LiveKit room and token configuration utilities. + +This module provides helper functions for creating and configuring LiveKit +rooms and authentication tokens. It handles JWT token generation with +appropriate grants for both regular participants and AI agents. + +The module supports creating tokens for development and testing, with +automatic agent detection for proper room permissions. + +Required environment variables: + +- LIVEKIT_API_KEY - LiveKit API key +- LIVEKIT_API_SECRET - LiveKit API secret +- LIVEKIT_URL - LiveKit server URL +- LIVEKIT_ROOM_NAME - Room name to join + +Example:: + + from pipecat.runner.livekit import configure + + url, token, room_name = await configure() + # Use with LiveKitTransport +""" + +import argparse +import os +from typing import Optional + +from livekit import api +from loguru import logger + + +def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str: + """Generate a LiveKit access token for a participant. + + Args: + room_name: Name of the LiveKit room. + participant_name: Name of the participant. + api_key: LiveKit API key. + api_secret: LiveKit API secret. + + Returns: + JWT token string for room access. + """ + token = api.AccessToken(api_key, api_secret) + token.with_identity(participant_name).with_name(participant_name).with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + ) + ) + + return token.to_jwt() + + +def generate_token_with_agent( + room_name: str, participant_name: str, api_key: str, api_secret: str +) -> str: + """Generate a LiveKit access token for an agent participant. + + Args: + room_name: Name of the LiveKit room. + participant_name: Name of the participant. + api_key: LiveKit API key. + api_secret: LiveKit API secret. + + Returns: + JWT token string for agent room access. + """ + token = api.AccessToken(api_key, api_secret) + token.with_identity(participant_name).with_name(participant_name).with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + agent=True, # This makes LiveKit client know agent has joined + ) + ) + + return token.to_jwt() + + +async def configure(): + """Configure LiveKit room URL and token from arguments or environment. + + Returns: + Tuple containing the server URL, authentication token, and room name. + + Raises: + Exception: If required LiveKit configuration is not provided. + """ + (url, token, room_name, _) = await configure_with_args() + return (url, token, room_name) + + +async def configure_with_args(parser: Optional[argparse.ArgumentParser] = None): + """Configure LiveKit room with command-line argument parsing. + + Args: + parser: Optional argument parser. If None, creates a default one. + + Returns: + Tuple containing server URL, authentication token, room name, and parsed arguments. + + Raises: + Exception: If required LiveKit configuration is not provided via arguments or environment. + """ + if not parser: + parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample") + parser.add_argument( + "-r", "--room", type=str, required=False, help="Name of the LiveKit room to join" + ) + parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server") + + args, unknown = parser.parse_known_args() + + room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME") + url = args.url or os.getenv("LIVEKIT_URL") + api_key = os.getenv("LIVEKIT_API_KEY") + api_secret = os.getenv("LIVEKIT_API_SECRET") + + if not room_name: + raise Exception( + "No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment." + ) + + if not url: + raise Exception( + "No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment." + ) + + if not api_key or not api_secret: + raise Exception( + "LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables." + ) + + token = generate_token_with_agent(room_name, "Pipecat Agent", api_key, api_secret) + + # Generate user token for testing/debugging + user_token = generate_token(room_name, "User", api_key, api_secret) + logger.info(f"User token: {user_token}") + + return (url, token, room_name, args) diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py new file mode 100644 index 000000000..3d75b563e --- /dev/null +++ b/src/pipecat/runner/run.py @@ -0,0 +1,462 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Pipecat development runner. + +This development runner executes Pipecat bots and provides the supporting +infrastructure they need - creating Daily rooms and tokens, managing WebRTC +connections, and setting up telephony webhook/WebSocket infrastructure. It +supports multiple transport types with a unified interface. + +Install with:: + + pip install pipecat-ai[runner] + +All bots must implement a `bot(runner_args)` async function as the entry point. +The server automatically discovers and executes this function when connections +are established. + +Single transport example:: + + async def bot(runner_args: RunnerArguments): + transport = DailyTransport( + runner_args.room_url, + runner_args.token, + "Bot", + DailyParams(...) + ) + # Your bot logic here + await run_pipeline(transport) + + if __name__ == "__main__": + from pipecat.runner.run import main + main() + +Multiple transport example:: + + async def bot(runner_args: RunnerArguments): + # Type-safe transport detection + if isinstance(runner_args, DailyRunnerArguments): + transport = setup_daily_transport(runner_args) # Your application code + elif isinstance(runner_args, SmallWebRTCRunnerArguments): + transport = setup_webrtc_transport(runner_args) # Your application code + elif isinstance(runner_args, WebSocketRunnerArguments): + transport = setup_telephony_transport(runner_args) # Your application code + + # Your bot implementation + await run_pipeline(transport) + +Supported transports: + +- Daily - Creates rooms and tokens, runs bot as participant +- WebRTC - Provides local WebRTC interface with prebuilt UI +- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo + +To run locally: + +- WebRTC: `python bot.py -t webrtc` +- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100` +- Daily (server): `python bot.py -t daily` +- Daily (direct, testing only): `python bot.py -d` +- Telephony: `python bot.py -t twilio -x your_username.ngrok.io` +""" + +import argparse +import asyncio +import os +import sys +from contextlib import asynccontextmanager +from typing import Dict + +from loguru import logger + +from pipecat.runner.types import ( + DailyRunnerArguments, + SmallWebRTCRunnerArguments, + WebSocketRunnerArguments, +) + +try: + import uvicorn + from dotenv import load_dotenv + from fastapi import BackgroundTasks, FastAPI, WebSocket + from fastapi.middleware.cors import CORSMiddleware + from fastapi.responses import HTMLResponse, RedirectResponse +except ImportError as e: + logger.error(f"Runner dependencies not available: {e}") + logger.error("To use Pipecat runners, install with: pip install pipecat-ai[runner]") + raise ImportError( + "Runner dependencies required. Install with: pip install pipecat-ai[runner]" + ) from e + + +load_dotenv(override=True) +os.environ["ENV"] = "local" + + +def _get_bot_module(): + """Get the bot module from the calling script.""" + import importlib.util + + # Get the main module (the file that was executed) + main_module = sys.modules["__main__"] + + # Check if it has a bot function + if hasattr(main_module, "bot"): + return main_module + + # Try to import 'bot' module from current directory + try: + import bot # type: ignore[import-untyped] + + return bot + except ImportError: + pass + + # Look for any .py file in current directory that has a bot function + # (excluding server.py). + cwd = os.getcwd() + for filename in os.listdir(cwd): + if filename.endswith(".py") and filename != "server.py": + try: + module_name = filename[:-3] # Remove .py extension + spec = importlib.util.spec_from_file_location( + module_name, os.path.join(cwd, filename) + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + if hasattr(module, "bot"): + return module + except Exception: + continue + + raise ImportError( + "Could not find 'bot' function. Make sure your bot file has a 'bot' function." + ) + + +async def _run_telephony_bot(websocket: WebSocket): + """Run a bot for telephony transports.""" + bot_module = _get_bot_module() + + # Just pass the WebSocket - let the bot handle parsing + runner_args = WebSocketRunnerArguments(websocket=websocket) + + await bot_module.bot(runner_args) + + +def _create_server_app( + transport_type: str, host: str = "localhost", proxy: str = None, esp32_mode: bool = False +): + """Create FastAPI app with transport-specific routes.""" + app = FastAPI() + + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # Set up transport-specific routes + if transport_type == "webrtc": + _setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host) + elif transport_type == "daily": + _setup_daily_routes(app) + elif transport_type in ["twilio", "telnyx", "plivo"]: + _setup_telephony_routes(app, transport_type, proxy) + else: + logger.warning(f"Unknown transport type: {transport_type}") + + return app + + +def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "localhost"): + """Set up WebRTC-specific routes.""" + try: + from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI + + from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + except ImportError as e: + logger.error(f"WebRTC transport dependencies not installed: {e}") + return + + # Store connections by pc_id + pcs_map: Dict[str, SmallWebRTCConnection] = {} + + # Mount the frontend + app.mount("/client", SmallWebRTCPrebuiltUI) + + @app.get("/", include_in_schema=False) + async def root_redirect(): + """Redirect root requests to client interface.""" + return RedirectResponse(url="/client/") + + @app.post("/api/offer") + async def offer(request: dict, background_tasks: BackgroundTasks): + """Handle WebRTC offer requests and manage peer connections.""" + pc_id = request.get("pc_id") + + if pc_id and pc_id in pcs_map: + pipecat_connection = pcs_map[pc_id] + logger.info(f"Reusing existing connection for pc_id: {pc_id}") + await pipecat_connection.renegotiate( + sdp=request["sdp"], + type=request["type"], + restart_pc=request.get("restart_pc", False), + ) + else: + pipecat_connection = SmallWebRTCConnection() + await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"]) + + @pipecat_connection.event_handler("closed") + async def handle_disconnected(webrtc_connection: SmallWebRTCConnection): + """Handle WebRTC connection closure and cleanup.""" + logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}") + pcs_map.pop(webrtc_connection.pc_id, None) + + bot_module = _get_bot_module() + runner_args = SmallWebRTCRunnerArguments(webrtc_connection=pipecat_connection) + background_tasks.add_task(bot_module.bot, runner_args) + + answer = pipecat_connection.get_answer() + + # Apply ESP32 SDP munging if enabled + if esp32_mode and host != "localhost": + from pipecat.runner.utils import smallwebrtc_sdp_munging + + answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], host) + + pcs_map[answer["pc_id"]] = pipecat_connection + return answer + + @asynccontextmanager + async def lifespan(app: FastAPI): + """Manage FastAPI application lifecycle and cleanup connections.""" + yield + coros = [pc.disconnect() for pc in pcs_map.values()] + await asyncio.gather(*coros) + pcs_map.clear() + + app.router.lifespan_context = lifespan + + +def _setup_daily_routes(app: FastAPI): + """Set up Daily-specific routes.""" + + @app.get("/") + async def start_agent(): + """Launch a Daily bot and redirect to room.""" + print("Starting bot with Daily transport") + + import aiohttp + + from pipecat.runner.daily import configure + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + + # Start the bot in the background + bot_module = _get_bot_module() + runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) + asyncio.create_task(bot_module.bot(runner_args)) + return RedirectResponse(room_url) + + @app.post("/connect") + async def rtvi_connect(): + """Launch a Daily bot and return connection info for RTVI clients.""" + print("Starting bot with Daily transport") + + import aiohttp + + from pipecat.runner.daily import configure + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + + # Start the bot in the background + bot_module = _get_bot_module() + runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) + asyncio.create_task(bot_module.bot(runner_args)) + return {"room_url": room_url, "token": token} + + +def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str): + """Set up telephony-specific routes.""" + # XML response templates + XML_TEMPLATES = { + "twilio": f""" + + + + + +""", + "telnyx": f""" + + + + + +""", + "plivo": f""" + + wss://{proxy}/ws +""", + } + + @app.post("/") + async def start_call(): + """Handle telephony webhook and return XML response.""" + logger.debug(f"POST {transport_type.upper()} XML") + xml_content = XML_TEMPLATES.get(transport_type, "") + return HTMLResponse(content=xml_content, media_type="application/xml") + + @app.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket): + """Handle WebSocket connections for telephony.""" + await websocket.accept() + logger.debug("WebSocket connection accepted") + await _run_telephony_bot(websocket) + + @app.get("/") + async def start_agent(): + """Simple status endpoint for telephony transports.""" + return {"status": f"Bot started with {transport_type}"} + + +async def _run_daily_direct(): + """Run Daily bot with direct connection (no FastAPI server).""" + try: + import aiohttp + + from pipecat.runner.daily import configure + except ImportError as e: + logger.error("Daily transport dependencies not installed.") + return + + logger.info("Running with direct Daily connection...") + + async with aiohttp.ClientSession() as session: + room_url, token = await configure(session) + + runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={}) + + # Get the bot module and run it directly + bot_module = _get_bot_module() + + print(f"📞 Joining Daily room: {room_url}") + print(" (Direct connection - no web server needed)") + print() + + await bot_module.bot(runner_args) + + +def main(): + """Start the Pipecat development runner. + + Parses command-line arguments and starts a FastAPI server configured + for the specified transport type. The runner will discover and run + any bot() function found in the current directory. + + Command-line arguments: + + Args: + --host: Server host address (default: localhost) + --port: Server port (default: 7860) + -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo) + -x/--proxy: Public proxy hostname for telephony webhooks + --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address) + -d/--direct: Connect directly to Daily room (automatically sets transport to daily) + -v/--verbose: Increase logging verbosity + + The bot file must contain a `bot(runner_args)` function as the entry point. + """ + parser = argparse.ArgumentParser(description="Pipecat Development Runner") + parser.add_argument("--host", type=str, default="localhost", help="Host address") + parser.add_argument("--port", type=int, default=7860, help="Port number") + parser.add_argument( + "-t", + "--transport", + type=str, + choices=["daily", "webrtc", "twilio", "telnyx", "plivo"], + default="webrtc", + help="Transport type", + ) + parser.add_argument("--proxy", "-x", help="Public proxy host name") + parser.add_argument( + "--esp32", + action="store_true", + default=False, + help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)", + ) + parser.add_argument( + "-d", + "--direct", + action="store_true", + default=False, + help="Connect directly to Daily room (automatically sets transport to daily)", + ) + parser.add_argument( + "--verbose", "-v", action="count", default=0, help="Increase logging verbosity" + ) + + args = parser.parse_args() + + # Auto-set transport to daily if --direct is used without explicit transport + if args.direct and args.transport == "webrtc": # webrtc is the default + args.transport = "daily" + elif args.direct and args.transport != "daily": + logger.error("--direct flag only works with Daily transport (-t daily)") + return + + # Validate ESP32 requirements + if args.esp32 and args.host == "localhost": + logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.") + return + + # Log level + logger.remove() + logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG") + + # Handle direct Daily connection (no FastAPI server) + if args.direct: + print() + print("🚀 Connecting directly to Daily room...") + print() + + # Run direct Daily connection + asyncio.run(_run_daily_direct()) + return + + # Print startup message for server-based transports + if args.transport == "webrtc": + print() + if args.esp32: + print( + f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client (ESP32 mode)" + ) + else: + print(f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client") + print(f" Open this URL in your browser to connect!") + print() + elif args.transport == "daily": + print() + print(f"🚀 Daily server starting at http://{args.host}:{args.port}") + print(f" Open this URL in your browser to start a session!") + print() + + # Create the app with transport-specific setup + app = _create_server_app(args.transport, args.host, args.proxy, args.esp32) + + # Run the server + uvicorn.run(app, host=args.host, port=args.port) + + +if __name__ == "__main__": + main() diff --git a/src/pipecat/runner/types.py b/src/pipecat/runner/types.py new file mode 100644 index 000000000..d093b09dd --- /dev/null +++ b/src/pipecat/runner/types.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Runner session argument types for the development runner. + +These types are used by the development runner to pass transport-specific +information to bot functions. +""" + +from dataclasses import dataclass +from typing import Any + +from fastapi import WebSocket + + +@dataclass +class RunnerArguments: + """Base class for runner session arguments.""" + + pass + + +@dataclass +class DailyRunnerArguments(RunnerArguments): + """Daily transport session arguments for the runner. + + Parameters: + room_url: Daily room URL to join + token: Authentication token for the room + body: Additional request data + """ + + room_url: str + token: str + body: Any + + +@dataclass +class WebSocketRunnerArguments(RunnerArguments): + """WebSocket transport session arguments for the runner. + + Parameters: + websocket: WebSocket connection for audio streaming + """ + + websocket: WebSocket + + +@dataclass +class SmallWebRTCRunnerArguments(RunnerArguments): + """Small WebRTC transport session arguments for the runner. + + Parameters: + webrtc_connection: Pre-configured WebRTC peer connection + """ + + webrtc_connection: Any diff --git a/src/pipecat/runner/utils.py b/src/pipecat/runner/utils.py new file mode 100644 index 000000000..e6cb0dc60 --- /dev/null +++ b/src/pipecat/runner/utils.py @@ -0,0 +1,481 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Transport utility functions and FastAPI route setup helpers. + +This module provides common functionality for setting up transport-specific +FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP +manipulation utilities for WebRTC compatibility and transport detection helpers. + +Key features: + +- WebRTC route setup with connection management +- WebSocket route setup for telephony providers +- SDP munging for ESP32 and other WebRTC compatibility +- Transport client ID detection across different transport types +- Video capture utilities for Daily transports + +The utilities are designed to be transport-agnostic where possible, with +specific handlers for each transport type's unique requirements. + +Example:: + + from pipecat.runner.utils import parse_telephony_websocket + + async def telephony_websocket_handler(websocket: WebSocket): + transport_type, call_data = await parse_telephony_websocket(websocket) +""" + +import json +import os +import re +from typing import Any, Callable, Dict, Optional + +from fastapi import WebSocket +from loguru import logger + +from pipecat.runner.types import ( + DailyRunnerArguments, + SmallWebRTCRunnerArguments, + WebSocketRunnerArguments, +) +from pipecat.transports.base_transport import BaseTransport + + +def _detect_transport_type_from_message(message_data: dict) -> str: + """Attempt to auto-detect transport type from WebSocket message structure.""" + logger.trace("=== Auto-Detection Analysis ===") + + # Twilio detection + if ( + message_data.get("event") == "start" + and "start" in message_data + and "streamSid" in message_data.get("start", {}) + and "callSid" in message_data.get("start", {}) + ): + logger.trace("Auto-detected: TWILIO") + return "twilio" + + # Telnyx detection + if ( + "stream_id" in message_data + and "start" in message_data + and "call_control_id" in message_data.get("start", {}) + ): + logger.trace("Auto-detected: TELNYX") + return "telnyx" + + # Plivo detection + if ( + "start" in message_data + and "streamId" in message_data.get("start", {}) + and "callId" in message_data.get("start", {}) + ): + logger.trace("Auto-detected: PLIVO") + return "plivo" + + logger.trace("Auto-detection failed - unknown format") + return "unknown" + + +async def parse_telephony_websocket(websocket: WebSocket): + """Parse telephony WebSocket messages and return transport type and call data. + + Returns: + tuple: (transport_type: str, call_data: dict) + + call_data contains provider-specific fields: + - Twilio: {"stream_id": str, "call_id": str} + - Telnyx: {"stream_id": str, "call_control_id": str, "outbound_encoding": str} + - Plivo: {"stream_id": str, "call_id": str} + + Example usage:: + + transport_type, call_data = await parse_telephony_websocket(websocket) + if transport_type == "telnyx": + outbound_encoding = call_data["outbound_encoding"] + """ + # Read first two messages + start_data = websocket.iter_text() + + try: + # First message + first_message_raw = await start_data.__anext__() + logger.trace(f"First message: {first_message_raw}") + try: + first_message = json.loads(first_message_raw) + except json.JSONDecodeError: + first_message = {} + + # Second message + second_message_raw = await start_data.__anext__() + logger.trace(f"Second message: {second_message_raw}") + try: + second_message = json.loads(second_message_raw) + except json.JSONDecodeError: + second_message = {} + + # Try auto-detection on both messages + detected_type_first = _detect_transport_type_from_message(first_message) + detected_type_second = _detect_transport_type_from_message(second_message) + + # Use the successful detection + if detected_type_first != "unknown": + transport_type = detected_type_first + call_data_raw = first_message + logger.debug(f"Detected transport: {transport_type} (from first message)") + elif detected_type_second != "unknown": + transport_type = detected_type_second + call_data_raw = second_message + logger.debug(f"Detected transport: {transport_type} (from second message)") + else: + transport_type = "unknown" + call_data_raw = second_message + logger.warning("Could not auto-detect transport type") + + # Extract provider-specific data + if transport_type == "twilio": + start_data = call_data_raw.get("start", {}) + call_data = { + "stream_id": start_data.get("streamSid"), + "call_id": start_data.get("callSid"), + } + + elif transport_type == "telnyx": + call_data = { + "stream_id": call_data_raw.get("stream_id"), + "call_control_id": call_data_raw.get("start", {}).get("call_control_id"), + "outbound_encoding": call_data_raw.get("start", {}) + .get("media_format", {}) + .get("encoding"), + } + + elif transport_type == "plivo": + start_data = call_data_raw.get("start", {}) + call_data = { + "stream_id": start_data.get("streamId"), + "call_id": start_data.get("callId"), + } + + else: + call_data = {} + + logger.debug(f"Parsed - Type: {transport_type}, Data: {call_data}") + return transport_type, call_data + + except Exception as e: + logger.error(f"Error parsing telephony WebSocket: {e}") + raise + + +def get_transport_client_id(transport: BaseTransport, client: Any) -> str: + """Get client identifier from transport-specific client object. + + Args: + transport: The transport instance. + client: Transport-specific client object. + + Returns: + Client identifier string, empty if transport not supported. + """ + # Import conditionally to avoid dependency issues + try: + from pipecat.transports.network.small_webrtc import SmallWebRTCTransport + + if isinstance(transport, SmallWebRTCTransport): + return client.pc_id + except ImportError: + pass + + try: + from pipecat.transports.services.daily import DailyTransport + + if isinstance(transport, DailyTransport): + return client["id"] + except ImportError: + pass + + logger.warning(f"Unable to get client id from unsupported transport {type(transport)}") + return "" + + +async def maybe_capture_participant_camera( + transport: BaseTransport, client: Any, framerate: int = 0 +): + """Capture participant camera video if transport supports it. + + Args: + transport: The transport instance. + client: Transport-specific client object. + framerate: Video capture framerate. Defaults to 0 (auto). + """ + try: + from pipecat.transports.services.daily import DailyTransport + + if isinstance(transport, DailyTransport): + await transport.capture_participant_video( + client["id"], framerate=framerate, video_source="camera" + ) + except ImportError: + pass + + +async def maybe_capture_participant_screen( + transport: BaseTransport, client: Any, framerate: int = 0 +): + """Capture participant screen video if transport supports it. + + Args: + transport: The transport instance. + client: Transport-specific client object. + framerate: Video capture framerate. Defaults to 0 (auto). + """ + try: + from pipecat.transports.services.daily import DailyTransport + + if isinstance(transport, DailyTransport): + await transport.capture_participant_video( + client["id"], framerate=framerate, video_source="screenVideo" + ) + except ImportError: + pass + + +def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str: + """Clean up ICE candidates in SDP text for SmallWebRTC. + + Args: + text: SDP text to clean up. + pattern: Pattern to match for candidate filtering. + + Returns: + Cleaned SDP text with filtered ICE candidates. + """ + result = [] + lines = text.splitlines() + for line in lines: + if re.search("a=candidate", line): + if re.search(pattern, line) and not re.search("raddr", line): + result.append(line) + else: + result.append(line) + return "\r\n".join(result) + + +def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str: + """Remove unsupported fingerprint algorithms from SDP text. + + Args: + text: SDP text to clean up. + + Returns: + SDP text with sha-384 and sha-512 fingerprints removed. + """ + result = [] + lines = text.splitlines() + for line in lines: + if not re.search("sha-384", line) and not re.search("sha-512", line): + result.append(line) + return "\r\n".join(result) + + +def smallwebrtc_sdp_munging(sdp: str, host: str) -> str: + """Apply SDP modifications for SmallWebRTC compatibility. + + Args: + sdp: Original SDP string. + host: Host address for ICE candidate filtering. + + Returns: + Modified SDP string with fingerprint and ICE candidate cleanup. + """ + sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp) + sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host) + return sdp + + +def _get_transport_params(transport_key: str, transport_params: Dict[str, Callable]) -> Any: + """Get transport parameters from factory function. + + Args: + transport_key: The transport key to look up + transport_params: Dict mapping transport names to parameter factory functions + + Returns: + Transport parameters from the factory function + + Raises: + ValueError: If transport key is missing from transport_params + """ + if transport_key not in transport_params: + raise ValueError( + f"Missing transport params for '{transport_key}'. " + f"Please add '{transport_key}' key to your transport_params dict." + ) + + params = transport_params[transport_key]() + logger.debug(f"Using transport params for {transport_key}") + return params + + +async def _create_telephony_transport( + websocket: WebSocket, + params: Optional[Any] = None, + transport_type: str = None, + call_data: dict = None, +) -> BaseTransport: + """Create a telephony transport with pre-parsed WebSocket data. + + Args: + websocket: FastAPI WebSocket connection from telephony provider + params: FastAPIWebsocketParams (required) + transport_type: Pre-detected provider type ("twilio", "telnyx", "plivo") + call_data: Pre-parsed call data dict with provider-specific fields + + Returns: + Configured FastAPIWebsocketTransport ready for telephony use. + """ + from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport + + if params is None: + raise ValueError( + "FastAPIWebsocketParams must be provided. " + "The serializer and add_wav_header will be set automatically." + ) + + # Always set add_wav_header to False for telephony + params.add_wav_header = False + + logger.info(f"Using pre-detected telephony provider: {transport_type}") + + if transport_type == "twilio": + from pipecat.serializers.twilio import TwilioFrameSerializer + + params.serializer = TwilioFrameSerializer( + stream_sid=call_data["stream_id"], + call_sid=call_data["call_id"], + account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""), + auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""), + ) + elif transport_type == "telnyx": + from pipecat.serializers.telnyx import TelnyxFrameSerializer + + params.serializer = TelnyxFrameSerializer( + stream_id=call_data["stream_id"], + call_control_id=call_data["call_control_id"], + outbound_encoding=call_data["outbound_encoding"], + inbound_encoding="PCMU", # Standard default + api_key=os.getenv("TELNYX_API_KEY", ""), + ) + elif transport_type == "plivo": + from pipecat.serializers.plivo import PlivoFrameSerializer + + params.serializer = PlivoFrameSerializer( + stream_id=call_data["stream_id"], + call_id=call_data["call_id"], + auth_id=os.getenv("PLIVO_AUTH_ID", ""), + auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""), + ) + else: + raise ValueError( + f"Unsupported telephony provider: {transport_type}. " + f"Supported providers: twilio, telnyx, plivo" + ) + + return FastAPIWebsocketTransport(websocket=websocket, params=params) + + +async def create_transport( + runner_args: Any, transport_params: Dict[str, Callable] +) -> BaseTransport: + """Create a transport from runner arguments using factory functions. + + This function uses the clean transport_params factory pattern where users + define a dictionary mapping transport names to parameter factory functions. + + Args: + runner_args: Arguments from the runner. + transport_params: Dict mapping transport names to parameter factory functions. + Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo" + Values should be functions that return transport parameters when called. + + Returns: + Configured transport instance. + + Raises: + ValueError: If transport key is missing from transport_params or runner_args type is unsupported. + ImportError: If required dependencies are not installed. + + Example:: + + transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), + "telnyx": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), + "plivo": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), + } + + transport = await create_transport(runner_args, transport_params) + """ + # Create transport based on runner args type + if isinstance(runner_args, DailyRunnerArguments): + params = _get_transport_params("daily", transport_params) + + from pipecat.transports.services.daily import DailyTransport + + return DailyTransport( + runner_args.room_url, + runner_args.token, + "Pipecat Bot", + params=params, + ) + + elif isinstance(runner_args, SmallWebRTCRunnerArguments): + params = _get_transport_params("webrtc", transport_params) + + from pipecat.transports.network.small_webrtc import SmallWebRTCTransport + + return SmallWebRTCTransport( + params=params, + webrtc_connection=runner_args.webrtc_connection, + ) + + elif isinstance(runner_args, WebSocketRunnerArguments): + # Parse once to determine the provider and get data + transport_type, call_data = await parse_telephony_websocket(runner_args.websocket) + params = _get_transport_params(transport_type, transport_params) + + # Create telephony transport with pre-parsed data + return await _create_telephony_transport( + runner_args.websocket, params, transport_type, call_data + ) + + else: + raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")