New runner module (#2269)

* Adds pipecat.runner.run - FastAPI-based development server with automatic bot discovery

* Adds new RunnerArguments types for different transports

* Adds new runner utils for creating transports and parsing data

* Adds new Daily and LiveKit utils for setup
This commit is contained in:
Mark Backman
2025-07-30 19:02:28 -07:00
committed by GitHub
parent 9013b2929a
commit aa85fffa57
21 changed files with 1960 additions and 0 deletions

View File

@@ -15,6 +15,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added the ability to switch voices to `RimeTTSService`.
- Added unified development runner for building voice AI bots across multiple transports
- `pipecat.runner.run` FastAPI-based development server with automatic bot discovery
- `pipecat.runner.types` Runner session argument types (`DailyRunnerArguments`, `SmallWebRTCRunnerArguments`, `WebSocketRunnerArguments`)
- `pipecat.runner.utils.create_transport()` Factory function for creating transports from session arguments
- `pipecat.runner.daily` and `pipecat.runner.livekit` Configuration utilities for Daily and LiveKit setups
- Support for all transport types: Daily, WebRTC, Twilio, Telnyx, Plivo
- Automatic telephony provider detection and serializer configuration
- ESP32 WebRTC compatibility with SDP munging
- Environment detection (`ENV=local`) for conditional features
- Added Async.ai TTS integration (https://async.ai/)
- `AsyncAITTSService` WebSocket-based streaming TTS with interruption support

View File

@@ -1,5 +1,12 @@
#!/bin/bash
# Check if sphinx-build is installed
if ! command -v sphinx-build &> /dev/null; then
echo "Error: sphinx-build is not installed or not in PATH" >&2
echo "Please install Sphinx using: pip install -r requirements.txt" >&2
exit 1
fi
# Clean previous build
rm -rf _build

View File

@@ -202,6 +202,7 @@ def import_core_modules():
"pipecat.clocks",
"pipecat.metrics",
"pipecat.observers",
"pipecat.runner",
"pipecat.serializers",
"pipecat.sync",
"pipecat.transcriptions",

View File

@@ -26,6 +26,7 @@ Quick Links
Observers <api/pipecat.observers>
Pipeline <api/pipecat.pipeline>
Processors <api/pipecat.processors>
Runner <api/pipecat.runner>
Serializers <api/pipecat.serializers>
Services <api/pipecat.services>
Sync <api/pipecat.sync>

View File

@@ -44,6 +44,7 @@ pipecat-ai[openai]
pipecat-ai[qwen]
pipecat-ai[remote-smart-turn]
# pipecat-ai[riva] # Mocked
pipecat-ai[runner]
pipecat-ai[sambanova]
pipecat-ai[silero]
pipecat-ai[simli]

View File

@@ -0,0 +1,218 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are:
- Daily
- SmallWebRTC
- Twilio
- Telnyx
- Plivo
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import (
DailyRunnerArguments,
RunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = None
if isinstance(runner_args, DailyRunnerArguments):
from pipecat.transports.services.daily import DailyParams, DailyTransport
if os.environ.get("ENV") != "local":
from pipecat.audio.filters.krisp_filter import KrispFilter
krisp_filter = KrispFilter()
else:
krisp_filter = None
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=DailyParams(
audio_in_enabled=True,
audio_in_filter=krisp_filter,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
elif isinstance(runner_args, WebSocketRunnerArguments):
# Use the utility to parse WebSocket data
from pipecat.runner.utils import parse_telephony_websocket
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
logger.info(f"Auto-detected transport: {transport_type}")
# Create transport based on detected type
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
elif transport_type == "telnyx":
from pipecat.serializers.telnyx import TelnyxFrameSerializer
serializer = TelnyxFrameSerializer(
stream_id=call_data["stream_id"],
call_control_id=call_data["call_control_id"],
outbound_encoding=call_data["outbound_encoding"],
inbound_encoding="PCMU", # Set manually
api_key=os.getenv("TELNYX_API_KEY", ""),
)
elif transport_type == "plivo":
from pipecat.serializers.plivo import PlivoFrameSerializer
serializer = PlivoFrameSerializer(
stream_id=call_data["stream_id"],
call_id=call_data["call_id"],
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
)
else:
# Generic fallback
serializer = None
# Create the transport
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
else:
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
return
if transport is None:
logger.error("Failed to create transport")
return
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,144 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are:
- Daily
- SmallWebRTC
- Twilio
- Telnyx
- Plivo
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
# Define transport configurations using factory functions
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info("Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,157 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat Cloud-compatible bot example.
Transports are Daily or SmallWebRTC.
Run it with:
- WebRTC transport::
python 02-two-transport-bot.py
- Daily transport::
python 02-two-transport-bot.py --transport daily
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import DailyRunnerArguments, RunnerArguments, SmallWebRTCRunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = None
if isinstance(runner_args, DailyRunnerArguments):
from pipecat.transports.services.daily import DailyParams, DailyTransport
if os.environ.get("ENV") != "local":
from pipecat.audio.filters.krisp_filter import KrispFilter
krisp_filter = KrispFilter()
else:
krisp_filter = None
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=DailyParams(
audio_in_enabled=True,
audio_in_filter=krisp_filter,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
else:
logger.error(f"Unsupported runner arguments type: {type(runner_args)}")
return
if transport is None:
logger.error("Failed to create transport")
return
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat development runner example.
This example has a single transport—SmallWebRTCTransport.
Run it with::
python 03-single-transport-bot.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
load_dotenv(override=True)
async def run_bot(transport: BaseTransport):
"""Main bot logic that works with any transport."""
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a friendly AI assistant. Respond naturally and keep your answers conversational.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = SmallWebRTCTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
webrtc_connection=runner_args.webrtc_connection,
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,7 @@
FROM dailyco/pipecat-base:latest
COPY ./requirements.txt requirements.txt
RUN pip install --no-cache-dir --upgrade -r requirements.txt
COPY ./02-two-transport-bot.py bot.py

View File

@@ -0,0 +1,19 @@
#!/bin/bash
set -e
VERSION="0.1"
DOCKER_USERNAME="your_docker_username"
AGENT_NAME="cloud-simple-bot"
# Build the Docker image with the correct context
echo "Building Docker image..."
docker build --platform=linux/arm64 -t "$DOCKER_USERNAME/$AGENT_NAME:$VERSION" -t "$DOCKER_USERNAME/$AGENT_NAME:latest" .
# Push the Docker images
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:$VERSION..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:$VERSION"
echo "Pushing Docker image $DOCKER_USERNAME/$AGENT_NAME:latest..."
docker push "$DOCKER_USERNAME/$AGENT_NAME:latest"
echo "Successfully built and pushed $DOCKER_USERNAME/$AGENT_NAME:$VERSION and $DOCKER_USERNAME/$AGENT_NAME:latest"

View File

@@ -0,0 +1,3 @@
DEEPGRAM_API_KEY=your_deepgram_api_key
OPENAI_API_KEY=your_openai_api_key
CARTESIA_API_KEY=your_cartesia_api_key

View File

@@ -0,0 +1,8 @@
agent_name = "cloud-simple-bot"
image = "your_dockerhub_username/cloud-simple-bot:0.1"
image_credentials = "dockerhub-access"
secret_set = "cloud-simple-bot-secrets"
enable_krisp = true
[scaling]
min_agents = 0

View File

@@ -0,0 +1 @@
pipecat-ai[openai,daily,deepgram,cartesia,silero,webrtc,websocket,runner]

View File

@@ -86,6 +86,7 @@ playht = [ "pyht>=0.1.6", "websockets>=13.1,<15.0" ]
qwen = []
rime = [ "websockets>=13.1,<15.0" ]
riva = [ "nvidia-riva-client~=2.21.1" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sambanova = []
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch~=2.5.0", "torchaudio~=2.5.0" ]

View File

@@ -0,0 +1 @@
"""Pipecat runner package for local and cloud bot execution."""

112
src/pipecat/runner/daily.py Normal file
View File

@@ -0,0 +1,112 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Daily room and token configuration utilities.
This module provides helper functions for creating and configuring Daily rooms
and authentication tokens. It handles both command-line argument parsing and
environment variable configuration.
The module supports creating temporary rooms for development or using existing
rooms specified via arguments or environment variables.
Required environment variables:
- DAILY_API_KEY - Daily API key for room/token creation
- DAILY_SAMPLE_ROOM_URL (optional) - Existing room URL to use
- DAILY_SAMPLE_ROOM_TOKEN (optional) - Existing token to use
Example::
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Use room_url and token with DailyTransport
"""
import argparse
import os
from typing import Optional
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
"""Configure Daily room URL and token from arguments or environment.
Args:
aiohttp_session: HTTP session for making API requests.
Returns:
Tuple containing the room URL and authentication token.
Raises:
Exception: If room URL or API key are not provided.
"""
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
"""Configure Daily room with command-line argument parsing.
Args:
aiohttp_session: HTTP session for making API requests.
parser: Optional argument parser. If None, creates a default one.
Returns:
Tuple containing room URL, authentication token, and parsed arguments.
Raises:
Exception: If room URL or API key are not provided via arguments or environment.
"""
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 2 hours in
# the future.
expiry_time: float = 2 * 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token, args)

View File

@@ -0,0 +1,148 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LiveKit room and token configuration utilities.
This module provides helper functions for creating and configuring LiveKit
rooms and authentication tokens. It handles JWT token generation with
appropriate grants for both regular participants and AI agents.
The module supports creating tokens for development and testing, with
automatic agent detection for proper room permissions.
Required environment variables:
- LIVEKIT_API_KEY - LiveKit API key
- LIVEKIT_API_SECRET - LiveKit API secret
- LIVEKIT_URL - LiveKit server URL
- LIVEKIT_ROOM_NAME - Room name to join
Example::
from pipecat.runner.livekit import configure
url, token, room_name = await configure()
# Use with LiveKitTransport
"""
import argparse
import os
from typing import Optional
from livekit import api
from loguru import logger
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
"""Generate a LiveKit access token for a participant.
Args:
room_name: Name of the LiveKit room.
participant_name: Name of the participant.
api_key: LiveKit API key.
api_secret: LiveKit API secret.
Returns:
JWT token string for room access.
"""
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
)
)
return token.to_jwt()
def generate_token_with_agent(
room_name: str, participant_name: str, api_key: str, api_secret: str
) -> str:
"""Generate a LiveKit access token for an agent participant.
Args:
room_name: Name of the LiveKit room.
participant_name: Name of the participant.
api_key: LiveKit API key.
api_secret: LiveKit API secret.
Returns:
JWT token string for agent room access.
"""
token = api.AccessToken(api_key, api_secret)
token.with_identity(participant_name).with_name(participant_name).with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
agent=True, # This makes LiveKit client know agent has joined
)
)
return token.to_jwt()
async def configure():
"""Configure LiveKit room URL and token from arguments or environment.
Returns:
Tuple containing the server URL, authentication token, and room name.
Raises:
Exception: If required LiveKit configuration is not provided.
"""
(url, token, room_name, _) = await configure_with_args()
return (url, token, room_name)
async def configure_with_args(parser: Optional[argparse.ArgumentParser] = None):
"""Configure LiveKit room with command-line argument parsing.
Args:
parser: Optional argument parser. If None, creates a default one.
Returns:
Tuple containing server URL, authentication token, room name, and parsed arguments.
Raises:
Exception: If required LiveKit configuration is not provided via arguments or environment.
"""
if not parser:
parser = argparse.ArgumentParser(description="LiveKit AI SDK Bot Sample")
parser.add_argument(
"-r", "--room", type=str, required=False, help="Name of the LiveKit room to join"
)
parser.add_argument("-u", "--url", type=str, required=False, help="URL of the LiveKit server")
args, unknown = parser.parse_known_args()
room_name = args.room or os.getenv("LIVEKIT_ROOM_NAME")
url = args.url or os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
if not room_name:
raise Exception(
"No LiveKit room specified. Use the -r/--room option from the command line, or set LIVEKIT_ROOM_NAME in your environment."
)
if not url:
raise Exception(
"No LiveKit server URL specified. Use the -u/--url option from the command line, or set LIVEKIT_URL in your environment."
)
if not api_key or not api_secret:
raise Exception(
"LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in environment variables."
)
token = generate_token_with_agent(room_name, "Pipecat Agent", api_key, api_secret)
# Generate user token for testing/debugging
user_token = generate_token(room_name, "User", api_key, api_secret)
logger.info(f"User token: {user_token}")
return (url, token, room_name, args)

462
src/pipecat/runner/run.py Normal file
View File

@@ -0,0 +1,462 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Pipecat development runner.
This development runner executes Pipecat bots and provides the supporting
infrastructure they need - creating Daily rooms and tokens, managing WebRTC
connections, and setting up telephony webhook/WebSocket infrastructure. It
supports multiple transport types with a unified interface.
Install with::
pip install pipecat-ai[runner]
All bots must implement a `bot(runner_args)` async function as the entry point.
The server automatically discovers and executes this function when connections
are established.
Single transport example::
async def bot(runner_args: RunnerArguments):
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Bot",
DailyParams(...)
)
# Your bot logic here
await run_pipeline(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Multiple transport example::
async def bot(runner_args: RunnerArguments):
# Type-safe transport detection
if isinstance(runner_args, DailyRunnerArguments):
transport = setup_daily_transport(runner_args) # Your application code
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
transport = setup_webrtc_transport(runner_args) # Your application code
elif isinstance(runner_args, WebSocketRunnerArguments):
transport = setup_telephony_transport(runner_args) # Your application code
# Your bot implementation
await run_pipeline(transport)
Supported transports:
- Daily - Creates rooms and tokens, runs bot as participant
- WebRTC - Provides local WebRTC interface with prebuilt UI
- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo
To run locally:
- WebRTC: `python bot.py -t webrtc`
- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100`
- Daily (server): `python bot.py -t daily`
- Daily (direct, testing only): `python bot.py -d`
- Telephony: `python bot.py -t twilio -x your_username.ngrok.io`
"""
import argparse
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from typing import Dict
from loguru import logger
from pipecat.runner.types import (
DailyRunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
logger.error(f"Runner dependencies not available: {e}")
logger.error("To use Pipecat runners, install with: pip install pipecat-ai[runner]")
raise ImportError(
"Runner dependencies required. Install with: pip install pipecat-ai[runner]"
) from e
load_dotenv(override=True)
os.environ["ENV"] = "local"
def _get_bot_module():
"""Get the bot module from the calling script."""
import importlib.util
# Get the main module (the file that was executed)
main_module = sys.modules["__main__"]
# Check if it has a bot function
if hasattr(main_module, "bot"):
return main_module
# Try to import 'bot' module from current directory
try:
import bot # type: ignore[import-untyped]
return bot
except ImportError:
pass
# Look for any .py file in current directory that has a bot function
# (excluding server.py).
cwd = os.getcwd()
for filename in os.listdir(cwd):
if filename.endswith(".py") and filename != "server.py":
try:
module_name = filename[:-3] # Remove .py extension
spec = importlib.util.spec_from_file_location(
module_name, os.path.join(cwd, filename)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "bot"):
return module
except Exception:
continue
raise ImportError(
"Could not find 'bot' function. Make sure your bot file has a 'bot' function."
)
async def _run_telephony_bot(websocket: WebSocket):
"""Run a bot for telephony transports."""
bot_module = _get_bot_module()
# Just pass the WebSocket - let the bot handle parsing
runner_args = WebSocketRunnerArguments(websocket=websocket)
await bot_module.bot(runner_args)
def _create_server_app(
transport_type: str, host: str = "localhost", proxy: str = None, esp32_mode: bool = False
):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in ["twilio", "telnyx", "plivo"]:
_setup_telephony_routes(app, transport_type, proxy)
else:
logger.warning(f"Unknown transport type: {transport_type}")
return app
def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "localhost"):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
"""Handle WebRTC offer requests and manage peer connections."""
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"],
type=request["type"],
restart_pc=request.get("restart_pc", False),
)
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
"""Handle WebRTC connection closure and cleanup."""
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=pipecat_connection)
background_tasks.add_task(bot_module.bot, runner_args)
answer = pipecat_connection.get_answer()
# Apply ESP32 SDP munging if enabled
if esp32_mode and host != "localhost":
from pipecat.runner.utils import smallwebrtc_sdp_munging
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], host)
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
yield
coros = [pc.disconnect() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
app.router.lifespan_context = lifespan
def _setup_daily_routes(app: FastAPI):
"""Set up Daily-specific routes."""
@app.get("/")
async def start_agent():
"""Launch a Daily bot and redirect to room."""
print("Starting bot with Daily transport")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
asyncio.create_task(bot_module.bot(runner_args))
return RedirectResponse(room_url)
@app.post("/connect")
async def rtvi_connect():
"""Launch a Daily bot and return connection info for RTVI clients."""
print("Starting bot with Daily transport")
import aiohttp
from pipecat.runner.daily import configure
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
# Start the bot in the background
bot_module = _get_bot_module()
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
asyncio.create_task(bot_module.bot(runner_args))
return {"room_url": room_url, "token": token}
def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str):
"""Set up telephony-specific routes."""
# XML response templates
XML_TEMPLATES = {
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{proxy}/ws" bidirectionalMode="rtp"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{proxy}/ws</Stream>
</Response>""",
}
@app.post("/")
async def start_call():
"""Handle telephony webhook and return XML response."""
logger.debug(f"POST {transport_type.upper()} XML")
xml_content = XML_TEMPLATES.get(transport_type, "<Response></Response>")
return HTMLResponse(content=xml_content, media_type="application/xml")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle WebSocket connections for telephony."""
await websocket.accept()
logger.debug("WebSocket connection accepted")
await _run_telephony_bot(websocket)
@app.get("/")
async def start_agent():
"""Simple status endpoint for telephony transports."""
return {"status": f"Bot started with {transport_type}"}
async def _run_daily_direct():
"""Run Daily bot with direct connection (no FastAPI server)."""
try:
import aiohttp
from pipecat.runner.daily import configure
except ImportError as e:
logger.error("Daily transport dependencies not installed.")
return
logger.info("Running with direct Daily connection...")
async with aiohttp.ClientSession() as session:
room_url, token = await configure(session)
runner_args = DailyRunnerArguments(room_url=room_url, token=token, body={})
# Get the bot module and run it directly
bot_module = _get_bot_module()
print(f"📞 Joining Daily room: {room_url}")
print(" (Direct connection - no web server needed)")
print()
await bot_module.bot(runner_args)
def main():
"""Start the Pipecat development runner.
Parses command-line arguments and starts a FastAPI server configured
for the specified transport type. The runner will discover and run
any bot() function found in the current directory.
Command-line arguments:
Args:
--host: Server host address (default: localhost)
--port: Server port (default: 7860)
-t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo)
-x/--proxy: Public proxy hostname for telephony webhooks
--esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
-d/--direct: Connect directly to Daily room (automatically sets transport to daily)
-v/--verbose: Increase logging verbosity
The bot file must contain a `bot(runner_args)` function as the entry point.
"""
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
parser.add_argument("--host", type=str, default="localhost", help="Host address")
parser.add_argument("--port", type=int, default=7860, help="Port number")
parser.add_argument(
"-t",
"--transport",
type=str,
choices=["daily", "webrtc", "twilio", "telnyx", "plivo"],
default="webrtc",
help="Transport type",
)
parser.add_argument("--proxy", "-x", help="Public proxy host name")
parser.add_argument(
"--esp32",
action="store_true",
default=False,
help="Enable SDP munging for ESP32 compatibility (requires --host with IP address)",
)
parser.add_argument(
"-d",
"--direct",
action="store_true",
default=False,
help="Connect directly to Daily room (automatically sets transport to daily)",
)
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
args = parser.parse_args()
# Auto-set transport to daily if --direct is used without explicit transport
if args.direct and args.transport == "webrtc": # webrtc is the default
args.transport = "daily"
elif args.direct and args.transport != "daily":
logger.error("--direct flag only works with Daily transport (-t daily)")
return
# Validate ESP32 requirements
if args.esp32 and args.host == "localhost":
logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.")
return
# Log level
logger.remove()
logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG")
# Handle direct Daily connection (no FastAPI server)
if args.direct:
print()
print("🚀 Connecting directly to Daily room...")
print()
# Run direct Daily connection
asyncio.run(_run_daily_direct())
return
# Print startup message for server-based transports
if args.transport == "webrtc":
print()
if args.esp32:
print(
f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client (ESP32 mode)"
)
else:
print(f"🚀 WebRTC server starting at http://{args.host}:{args.port}/client")
print(f" Open this URL in your browser to connect!")
print()
elif args.transport == "daily":
print()
print(f"🚀 Daily server starting at http://{args.host}:{args.port}")
print(f" Open this URL in your browser to start a session!")
print()
# Create the app with transport-specific setup
app = _create_server_app(args.transport, args.host, args.proxy, args.esp32)
# Run the server
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,60 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Runner session argument types for the development runner.
These types are used by the development runner to pass transport-specific
information to bot functions.
"""
from dataclasses import dataclass
from typing import Any
from fastapi import WebSocket
@dataclass
class RunnerArguments:
"""Base class for runner session arguments."""
pass
@dataclass
class DailyRunnerArguments(RunnerArguments):
"""Daily transport session arguments for the runner.
Parameters:
room_url: Daily room URL to join
token: Authentication token for the room
body: Additional request data
"""
room_url: str
token: str
body: Any
@dataclass
class WebSocketRunnerArguments(RunnerArguments):
"""WebSocket transport session arguments for the runner.
Parameters:
websocket: WebSocket connection for audio streaming
"""
websocket: WebSocket
@dataclass
class SmallWebRTCRunnerArguments(RunnerArguments):
"""Small WebRTC transport session arguments for the runner.
Parameters:
webrtc_connection: Pre-configured WebRTC peer connection
"""
webrtc_connection: Any

481
src/pipecat/runner/utils.py Normal file
View File

@@ -0,0 +1,481 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Transport utility functions and FastAPI route setup helpers.
This module provides common functionality for setting up transport-specific
FastAPI routes and handling WebRTC/WebSocket connections. It includes SDP
manipulation utilities for WebRTC compatibility and transport detection helpers.
Key features:
- WebRTC route setup with connection management
- WebSocket route setup for telephony providers
- SDP munging for ESP32 and other WebRTC compatibility
- Transport client ID detection across different transport types
- Video capture utilities for Daily transports
The utilities are designed to be transport-agnostic where possible, with
specific handlers for each transport type's unique requirements.
Example::
from pipecat.runner.utils import parse_telephony_websocket
async def telephony_websocket_handler(websocket: WebSocket):
transport_type, call_data = await parse_telephony_websocket(websocket)
"""
import json
import os
import re
from typing import Any, Callable, Dict, Optional
from fastapi import WebSocket
from loguru import logger
from pipecat.runner.types import (
DailyRunnerArguments,
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.base_transport import BaseTransport
def _detect_transport_type_from_message(message_data: dict) -> str:
"""Attempt to auto-detect transport type from WebSocket message structure."""
logger.trace("=== Auto-Detection Analysis ===")
# Twilio detection
if (
message_data.get("event") == "start"
and "start" in message_data
and "streamSid" in message_data.get("start", {})
and "callSid" in message_data.get("start", {})
):
logger.trace("Auto-detected: TWILIO")
return "twilio"
# Telnyx detection
if (
"stream_id" in message_data
and "start" in message_data
and "call_control_id" in message_data.get("start", {})
):
logger.trace("Auto-detected: TELNYX")
return "telnyx"
# Plivo detection
if (
"start" in message_data
and "streamId" in message_data.get("start", {})
and "callId" in message_data.get("start", {})
):
logger.trace("Auto-detected: PLIVO")
return "plivo"
logger.trace("Auto-detection failed - unknown format")
return "unknown"
async def parse_telephony_websocket(websocket: WebSocket):
"""Parse telephony WebSocket messages and return transport type and call data.
Returns:
tuple: (transport_type: str, call_data: dict)
call_data contains provider-specific fields:
- Twilio: {"stream_id": str, "call_id": str}
- Telnyx: {"stream_id": str, "call_control_id": str, "outbound_encoding": str}
- Plivo: {"stream_id": str, "call_id": str}
Example usage::
transport_type, call_data = await parse_telephony_websocket(websocket)
if transport_type == "telnyx":
outbound_encoding = call_data["outbound_encoding"]
"""
# Read first two messages
start_data = websocket.iter_text()
try:
# First message
first_message_raw = await start_data.__anext__()
logger.trace(f"First message: {first_message_raw}")
try:
first_message = json.loads(first_message_raw)
except json.JSONDecodeError:
first_message = {}
# Second message
second_message_raw = await start_data.__anext__()
logger.trace(f"Second message: {second_message_raw}")
try:
second_message = json.loads(second_message_raw)
except json.JSONDecodeError:
second_message = {}
# Try auto-detection on both messages
detected_type_first = _detect_transport_type_from_message(first_message)
detected_type_second = _detect_transport_type_from_message(second_message)
# Use the successful detection
if detected_type_first != "unknown":
transport_type = detected_type_first
call_data_raw = first_message
logger.debug(f"Detected transport: {transport_type} (from first message)")
elif detected_type_second != "unknown":
transport_type = detected_type_second
call_data_raw = second_message
logger.debug(f"Detected transport: {transport_type} (from second message)")
else:
transport_type = "unknown"
call_data_raw = second_message
logger.warning("Could not auto-detect transport type")
# Extract provider-specific data
if transport_type == "twilio":
start_data = call_data_raw.get("start", {})
call_data = {
"stream_id": start_data.get("streamSid"),
"call_id": start_data.get("callSid"),
}
elif transport_type == "telnyx":
call_data = {
"stream_id": call_data_raw.get("stream_id"),
"call_control_id": call_data_raw.get("start", {}).get("call_control_id"),
"outbound_encoding": call_data_raw.get("start", {})
.get("media_format", {})
.get("encoding"),
}
elif transport_type == "plivo":
start_data = call_data_raw.get("start", {})
call_data = {
"stream_id": start_data.get("streamId"),
"call_id": start_data.get("callId"),
}
else:
call_data = {}
logger.debug(f"Parsed - Type: {transport_type}, Data: {call_data}")
return transport_type, call_data
except Exception as e:
logger.error(f"Error parsing telephony WebSocket: {e}")
raise
def get_transport_client_id(transport: BaseTransport, client: Any) -> str:
"""Get client identifier from transport-specific client object.
Args:
transport: The transport instance.
client: Transport-specific client object.
Returns:
Client identifier string, empty if transport not supported.
"""
# Import conditionally to avoid dependency issues
try:
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
return client.pc_id
except ImportError:
pass
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
return client["id"]
except ImportError:
pass
logger.warning(f"Unable to get client id from unsupported transport {type(transport)}")
return ""
async def maybe_capture_participant_camera(
transport: BaseTransport, client: Any, framerate: int = 0
):
"""Capture participant camera video if transport supports it.
Args:
transport: The transport instance.
client: Transport-specific client object.
framerate: Video capture framerate. Defaults to 0 (auto).
"""
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
await transport.capture_participant_video(
client["id"], framerate=framerate, video_source="camera"
)
except ImportError:
pass
async def maybe_capture_participant_screen(
transport: BaseTransport, client: Any, framerate: int = 0
):
"""Capture participant screen video if transport supports it.
Args:
transport: The transport instance.
client: Transport-specific client object.
framerate: Video capture framerate. Defaults to 0 (auto).
"""
try:
from pipecat.transports.services.daily import DailyTransport
if isinstance(transport, DailyTransport):
await transport.capture_participant_video(
client["id"], framerate=framerate, video_source="screenVideo"
)
except ImportError:
pass
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
"""Clean up ICE candidates in SDP text for SmallWebRTC.
Args:
text: SDP text to clean up.
pattern: Pattern to match for candidate filtering.
Returns:
Cleaned SDP text with filtered ICE candidates.
"""
result = []
lines = text.splitlines()
for line in lines:
if re.search("a=candidate", line):
if re.search(pattern, line) and not re.search("raddr", line):
result.append(line)
else:
result.append(line)
return "\r\n".join(result)
def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
"""Remove unsupported fingerprint algorithms from SDP text.
Args:
text: SDP text to clean up.
Returns:
SDP text with sha-384 and sha-512 fingerprints removed.
"""
result = []
lines = text.splitlines()
for line in lines:
if not re.search("sha-384", line) and not re.search("sha-512", line):
result.append(line)
return "\r\n".join(result)
def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
"""Apply SDP modifications for SmallWebRTC compatibility.
Args:
sdp: Original SDP string.
host: Host address for ICE candidate filtering.
Returns:
Modified SDP string with fingerprint and ICE candidate cleanup.
"""
sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp)
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
return sdp
def _get_transport_params(transport_key: str, transport_params: Dict[str, Callable]) -> Any:
"""Get transport parameters from factory function.
Args:
transport_key: The transport key to look up
transport_params: Dict mapping transport names to parameter factory functions
Returns:
Transport parameters from the factory function
Raises:
ValueError: If transport key is missing from transport_params
"""
if transport_key not in transport_params:
raise ValueError(
f"Missing transport params for '{transport_key}'. "
f"Please add '{transport_key}' key to your transport_params dict."
)
params = transport_params[transport_key]()
logger.debug(f"Using transport params for {transport_key}")
return params
async def _create_telephony_transport(
websocket: WebSocket,
params: Optional[Any] = None,
transport_type: str = None,
call_data: dict = None,
) -> BaseTransport:
"""Create a telephony transport with pre-parsed WebSocket data.
Args:
websocket: FastAPI WebSocket connection from telephony provider
params: FastAPIWebsocketParams (required)
transport_type: Pre-detected provider type ("twilio", "telnyx", "plivo")
call_data: Pre-parsed call data dict with provider-specific fields
Returns:
Configured FastAPIWebsocketTransport ready for telephony use.
"""
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport
if params is None:
raise ValueError(
"FastAPIWebsocketParams must be provided. "
"The serializer and add_wav_header will be set automatically."
)
# Always set add_wav_header to False for telephony
params.add_wav_header = False
logger.info(f"Using pre-detected telephony provider: {transport_type}")
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
params.serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
elif transport_type == "telnyx":
from pipecat.serializers.telnyx import TelnyxFrameSerializer
params.serializer = TelnyxFrameSerializer(
stream_id=call_data["stream_id"],
call_control_id=call_data["call_control_id"],
outbound_encoding=call_data["outbound_encoding"],
inbound_encoding="PCMU", # Standard default
api_key=os.getenv("TELNYX_API_KEY", ""),
)
elif transport_type == "plivo":
from pipecat.serializers.plivo import PlivoFrameSerializer
params.serializer = PlivoFrameSerializer(
stream_id=call_data["stream_id"],
call_id=call_data["call_id"],
auth_id=os.getenv("PLIVO_AUTH_ID", ""),
auth_token=os.getenv("PLIVO_AUTH_TOKEN", ""),
)
else:
raise ValueError(
f"Unsupported telephony provider: {transport_type}. "
f"Supported providers: twilio, telnyx, plivo"
)
return FastAPIWebsocketTransport(websocket=websocket, params=params)
async def create_transport(
runner_args: Any, transport_params: Dict[str, Callable]
) -> BaseTransport:
"""Create a transport from runner arguments using factory functions.
This function uses the clean transport_params factory pattern where users
define a dictionary mapping transport names to parameter factory functions.
Args:
runner_args: Arguments from the runner.
transport_params: Dict mapping transport names to parameter factory functions.
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo"
Values should be functions that return transport parameters when called.
Returns:
Configured transport instance.
Raises:
ValueError: If transport key is missing from transport_params or runner_args type is unsupported.
ImportError: If required dependencies are not installed.
Example::
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}
transport = await create_transport(runner_args, transport_params)
"""
# Create transport based on runner args type
if isinstance(runner_args, DailyRunnerArguments):
params = _get_transport_params("daily", transport_params)
from pipecat.transports.services.daily import DailyTransport
return DailyTransport(
runner_args.room_url,
runner_args.token,
"Pipecat Bot",
params=params,
)
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
params = _get_transport_params("webrtc", transport_params)
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
return SmallWebRTCTransport(
params=params,
webrtc_connection=runner_args.webrtc_connection,
)
elif isinstance(runner_args, WebSocketRunnerArguments):
# Parse once to determine the provider and get data
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
params = _get_transport_params(transport_type, transport_params)
# Create telephony transport with pre-parsed data
return await _create_telephony_transport(
runner_args.websocket, params, transport_type, call_data
)
else:
raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")