Compare commits

...

28 Commits

Author SHA1 Message Date
Mark Backman
6fca53c31d Add changelog for #4525 2026-05-19 18:32:02 -04:00
Mark Backman
e1f3b4fdbe raise ImportError instead of Exception for missing optional deps
Across 84 files, the optional-dependency guard at module load did
`raise Exception(f"Missing module: {e}")`, which is too generic and
drops the original `ModuleNotFoundError` traceback. Switch to
`raise ImportError(...) from e` so callers can `except ImportError:`
cleanly and the original cause is preserved.

Two files (audio/turn/krisp_viva_turn.py, turns/user_start/krisp_viva_ip_user_turn_start_strategy.py)
already used the correct pattern and were left untouched.
2026-05-19 18:31:25 -04:00
Mark Backman
c09f6d5adb Merge pull request #4052 from Vonage/vonage_video_connector_transport
Vonage WebRTC Transport Integration
2026-05-19 10:56:20 -04:00
asilvestre
e2d249e5d9 adding uv.lock 2026-05-19 16:33:38 +02:00
asilvestre
956b39b0dc remove extraenous await in cleanup 2026-05-19 16:33:04 +02:00
asilvestre
bc769eaa82 Changing the example to use OpenAI 2026-05-18 14:40:56 +02:00
asilvestre
ee5aa4dc71 SubscribeSettings to be pydantic and comment fixes 2026-05-18 14:40:56 +02:00
asilvestre
dd38fbc735 add documentation entry 2026-05-18 14:40:56 +02:00
asilvestre
a1c40df471 add documentation entry 2026-05-18 14:40:56 +02:00
asilvestre
c4ff9300c9 fix linting and typechecking 2026-05-18 14:40:56 +02:00
asilvestre
cab4585cbb added changelog 2026-05-18 14:40:56 +02:00
Antoni Silvestre
18368d047e Linting and changes to adapt to v1.0 2026-05-18 14:40:56 +02:00
asilvestre
e3abb4b6d7 apply suggestions in PR 2026-05-18 14:40:56 +02:00
Antoni Silvestre
0fd971d59d Update src/pipecat/runner/types.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-05-18 14:40:56 +02:00
asilvestre
c61672194d Vonage Video Connector Transport 2026-05-18 14:40:49 +02:00
Filipi da Silva Fuchter
c51a817efa Merge pull request #4442 from pipecat-ai/filipi/runner_all_transports
Unified start route to make all transports available
2026-05-18 09:27:44 -03:00
Bismeet singh
d85eda6da8 Merge pull request #4507 from BismeetSingh/fix/elevenlabs-stt-service-crash-language
Fix/elevenlabs stt service crash language
2026-05-17 10:17:07 -04:00
filipi87
b493ed8d3a Removing the websocket transport from elevenlabs example. 2026-05-15 10:11:38 -03:00
filipi87
c3338667b1 Mounting the prebuilt frontend UI and root redirect for all transports. 2026-05-15 10:06:47 -03:00
filipi87
c8efe319b3 Adding the changelog for the changes. 2026-05-14 11:10:33 -03:00
filipi87
d6655e7a5e Fixing ruff format. 2026-05-12 10:40:09 -03:00
filipi87
33b73df6ec Changing the websocket route to return the same data as PCC. 2026-05-12 10:38:15 -03:00
filipi87
c9f0172e9f Example supporting plain websocket. 2026-05-08 09:46:18 -03:00
filipi87
2638885c62 Adding support for the plain websocket transport. 2026-05-08 09:37:07 -03:00
filipi87
cb426cbb14 Fixing format. 2026-05-07 16:04:43 -03:00
filipi87
d39beff817 Fixing format. 2026-05-07 16:01:54 -03:00
filipi87
1eade184f1 Creating a status endpoint to return the available transports. 2026-05-07 15:53:15 -03:00
filipi87
3fa193b983 Unified start route to make all transports available. 2026-05-07 15:34:32 -03:00
103 changed files with 5752 additions and 355 deletions

View File

@@ -95,7 +95,7 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
| LLMs | [Anthropic](https://docs.pipecat.ai/api-reference/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/api-reference/server/services/llm/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/api-reference/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/api-reference/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/api-reference/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/api-reference/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/api-reference/server/services/llm/grok), [Groq](https://docs.pipecat.ai/api-reference/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/api-reference/server/services/llm/mistral), [Nebius](https://docs.pipecat.ai/api-reference/server/services/llm/nebius), [Novita](https://docs.pipecat.ai/api-reference/server/services/llm/novita), [NVIDIA NIM](https://docs.pipecat.ai/api-reference/server/services/llm/nvidia), [Ollama](https://docs.pipecat.ai/api-reference/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/llm/openai), [OpenAI Responses](https://docs.pipecat.ai/api-reference/server/services/llm/openai-responses), [OpenRouter](https://docs.pipecat.ai/api-reference/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/api-reference/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/api-reference/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/api-reference/server/services/llm/sambanova), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/llm/sarvam), [Together AI](https://docs.pipecat.ai/api-reference/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/api-reference/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/api-reference/server/services/tts/aws), [Azure](https://docs.pipecat.ai/api-reference/server/services/tts/azure), [Camb AI](https://docs.pipecat.ai/api-reference/server/services/tts/camb), [Cartesia](https://docs.pipecat.ai/api-reference/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/api-reference/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/api-reference/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/api-reference/server/services/tts/fish), [Google](https://docs.pipecat.ai/api-reference/server/services/tts/google), [Gradium](https://docs.pipecat.ai/api-reference/server/services/tts/gradium), [Groq](https://docs.pipecat.ai/api-reference/server/services/tts/groq), [Hume](https://docs.pipecat.ai/api-reference/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/api-reference/server/services/tts/inworld), [Kokoro](https://docs.pipecat.ai/api-reference/server/services/tts/kokoro), [LMNT](https://docs.pipecat.ai/api-reference/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/api-reference/server/services/tts/minimax), [Mistral](https://docs.pipecat.ai/api-reference/server/services/tts/mistral), [Neuphonic](https://docs.pipecat.ai/api-reference/server/services/tts/neuphonic), [NVIDIA](https://docs.pipecat.ai/api-reference/server/services/tts/nvidia), [OpenAI](https://docs.pipecat.ai/api-reference/server/services/tts/openai), [Piper](https://docs.pipecat.ai/api-reference/server/services/tts/piper), [Resemble](https://docs.pipecat.ai/api-reference/server/services/tts/resemble), [Rime](https://docs.pipecat.ai/api-reference/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/api-reference/server/services/tts/sarvam), [Smallest](https://docs.pipecat.ai/api-reference/server/services/tts/smallest), [Soniox](https://docs.pipecat.ai/api-reference/server/services/tts/soniox), [Speechmatics](https://docs.pipecat.ai/api-reference/server/services/tts/speechmatics), [xAI](https://docs.pipecat.ai/api-reference/server/services/tts/xai), [XTTS](https://docs.pipecat.ai/api-reference/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/api-reference/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/api-reference/server/services/s2s/gemini), [Grok Voice Agent](https://docs.pipecat.ai/api-reference/server/services/s2s/grok), [OpenAI Realtime](https://docs.pipecat.ai/api-reference/server/services/s2s/openai), [Ultravox](https://docs.pipecat.ai/api-reference/server/services/s2s/ultravox), |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/api-reference/server/services/transport/fastapi-websocket), [LiveKit (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/livekit), [SmallWebRTCTransport](https://docs.pipecat.ai/api-reference/server/services/transport/small-webrtc), [Vonage (WebRTC)](https://docs.pipecat.ai/api-reference/server/services/transport/vonage), [WebSocket Server](https://docs.pipecat.ai/api-reference/server/services/transport/websocket-server), [WhatsApp](https://docs.pipecat.ai/api-reference/server/services/transport/whatsapp), Local |
| Serializers | [Exotel](https://docs.pipecat.ai/api-reference/server/services/serializers/exotel), [Genesys](https://docs.pipecat.ai/api-reference/server/services/serializers/genesys), [Plivo](https://docs.pipecat.ai/api-reference/server/services/serializers/plivo), [Twilio](https://docs.pipecat.ai/api-reference/server/services/serializers/twilio), [Telnyx](https://docs.pipecat.ai/api-reference/server/services/serializers/telnyx), [Vonage](https://docs.pipecat.ai/api-reference/server/services/serializers/vonage) |
| Video | [HeyGen](https://docs.pipecat.ai/api-reference/server/services/video/heygen), [LemonSlice](https://docs.pipecat.ai/api-reference/server/services/transport/lemonslice), [Tavus](https://docs.pipecat.ai/api-reference/server/services/video/tavus), [Simli](https://docs.pipecat.ai/api-reference/server/services/video/simli) |
| Memory | [mem0](https://docs.pipecat.ai/api-reference/server/services/memory/mem0) |

1
changelog/4052.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `VonageVideoConnectorTransport`, a new transport integration for real-time Vonage WebRTC sessions using the Vonage Video Connector library.

View File

@@ -0,0 +1 @@
- Added `GET /status` endpoint to the development runner that reports which transports the running instance accepts (all by default, or the single transport passed via `-t`).

1
changelog/4442.added.md Normal file
View File

@@ -0,0 +1 @@
- Added plain WebSocket transport support to the development runner. Bots can now accept connections from non-telephony WebSocket clients (e.g., browser apps using protobuf framing) via the `/ws-client` endpoint alongside other transports.

View File

@@ -0,0 +1 @@
- ⚠️ The development runner now supports all transports (WebRTC, Daily, telephony, plain WebSocket) simultaneously from a single server. The `/start` endpoint accepts a `"transport"` field to select the transport per-request; omitting `-t` at startup enables all transports instead of defaulting to WebRTC. The Daily browser-redirect route moved from `GET /` to `GET /daily`.

1
changelog/4507.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ElevenLabsSTTService` crashing when `language` was passed as `None`. When `language` is not set, the service now lets ElevenLabs auto-detect the audio language.

View File

@@ -0,0 +1 @@
- Services and transports with missing optional dependencies now raise `ImportError` instead of a bare `Exception` when their module is imported without the required extra installed. The original `ModuleNotFoundError` is preserved as `__cause__`, so code that wraps these imports can now use `except ImportError:` cleanly instead of `except Exception:`.

View File

@@ -211,6 +211,11 @@ TWILIO_AUTH_TOKEN=...
# Ultravox Realtime
ULTRAVOX_API_KEY=...
# Vonage
VONAGE_APPLICATION_ID=...
VONAGE_SESSION_ID=...
VONAGE_TOKEN=...
# WhatsApp
WHATSAPP_TOKEN=...
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=...

View File

@@ -0,0 +1,134 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example of using OpenAI Realtime voice LLM service with Vonage Video Connector transport."""
import asyncio
import os
import sys
from collections.abc import Callable
from typing import Any
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.vonage import configure
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main() -> None:
"""Main entry point for the OpenAI Realtime vonage video connector example."""
(application_id, session_id, token) = await configure()
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="Bot",
),
)
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
system_instruction="""You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly.
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=SemanticTurnDetection(),
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
),
),
)
context = LLMContext(
[{"role": "developer", "content": "Say hello!"}],
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[TranscriptionLogObserver()],
)
event_handler: Callable[[str], Callable[[Any], Any]] = transport.event_handler
@event_handler("on_client_connected")
async def on_client_connected(transport: VonageVideoConnectorTransport, client: object) -> None:
logger.info("Client connected")
await task.queue_frames([LLMRunFrame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -103,7 +103,7 @@ piper = [ "piper-tts>=1.3.0,<2", "requests>=2.32.5,<3" ]
qwen = []
resembleai = [ "pipecat-ai[websockets-base]" ]
rime = [ "pipecat-ai[websockets-base]" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-small-webrtc-prebuilt>=2.5.0"]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-prebuilt>=1.0.0"]
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
sambanova = []
sarvam = [ "sarvamai==0.1.28", "pipecat-ai[websockets-base]" ]
@@ -119,6 +119,7 @@ tavus = [ "pipecat-ai[daily]" ]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0,<2", "opentelemetry-api>=1.33.0,<2", "opentelemetry-instrumentation>=0.54b0,<1" ]
ultravox = [ "pipecat-ai[websockets-base]" ]
vonage-video-connector = [ "vonage-video-connector~=0.2.3b0; python_full_version>='3.13' and python_full_version<'3.14' and platform_system=='Linux'" ]
webrtc = [ "aiortc>=1.14.0,<2", "opencv-python>=4.11.0.86,<5" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<1" ]
websockets-base = [ "websockets>=13.1,<16.0" ]

View File

@@ -28,7 +28,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class GeminiLLMInvocationParams(TypedDict):

View File

@@ -23,7 +23,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the Koala filter, you need to `pip install pipecat-ai[koala]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class KoalaFilter(BaseAudioFilter):

View File

@@ -27,7 +27,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use KrispVivaFilter, you need to install krisp_audio.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class KrispVivaFilter(BaseAudioFilter):

View File

@@ -28,7 +28,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use the soundfile mixer, you need to `pip install pipecat-ai[soundfile]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class SoundfileMixer(BaseAudioMixer):

View File

@@ -27,7 +27,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):

View File

@@ -33,7 +33,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use LocalSmartTurnAnalyzerV2, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class LocalSmartTurnAnalyzerV2(BaseSmartTurn):

View File

@@ -28,7 +28,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use KrispVivaVADAnalyzer, you need to install krisp_audio.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class KrispVivaVadAnalyzer(VADAnalyzer):

View File

@@ -27,7 +27,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Silero VAD, you need to `pip install pipecat-ai`.")
raise Exception(f"Missing module(s): {e}")
raise ImportError(f"Missing module(s): {e}") from e
class SileroOnnxModel:

View File

@@ -22,7 +22,7 @@ try:
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.error("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class LangchainProcessor(FrameProcessor):

View File

@@ -21,7 +21,7 @@ try:
from strands.multiagent.graph import Graph
except ModuleNotFoundError as e:
logger.error("In order to use Strands Agents, you need to `pip install strands-agents`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class StrandsAgentsProcessor(FrameProcessor):

View File

@@ -33,7 +33,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use GStreamer, you need to `pip install pipecat-ai[gstreamer]`. Also, you need to install GStreamer in your system."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class GStreamerPipelineSource(FrameProcessor):

View File

@@ -17,7 +17,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sentry, you need to `pip install pipecat-ai[sentry]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics

View File

@@ -19,6 +19,10 @@ All bots must implement a `bot(runner_args)` async function as the entry point.
The server automatically discovers and executes this function when connections
are established.
By default the runner starts a single FastAPI server that supports WebRTC, Daily,
and telephony transports simultaneously. Clients declare which transport they want
via the ``transport`` field in the ``/start`` request body (default: ``"webrtc"``).
Single transport example::
async def bot(runner_args: RunnerArguments):
@@ -55,14 +59,33 @@ Supported transports:
- WebRTC - Provides local WebRTC interface with prebuilt UI
- Telephony - Handles webhook and WebSocket connections for Twilio, Telnyx, Plivo, Exotel
The ``/start`` endpoint accepts::
{
"transport": "webrtc", // "webrtc" | "daily" | "twilio" | "telnyx" |
// "plivo" | "exotel" — default: "webrtc"
// WebRTC-specific
"enableDefaultIceServers": false,
"body": {...},
// Daily-specific
"createDailyRoom": true,
"dailyRoomProperties": {...},
"dailyMeetingTokenProperties": {...},
"body": {...}
}
To run locally:
- WebRTC: `python bot.py -t webrtc`
- ESP32: `python bot.py -t webrtc --esp32 --host 192.168.1.100`
- Daily (server): `python bot.py -t daily`
- Daily (direct, testing only): `python bot.py -d`
- Telephony: `python bot.py -t twilio -x your_username.ngrok.io`
- Exotel: `python bot.py -t exotel` (no proxy needed, but ngrok connection to HTTP 7860 is required)
- All transports (default): ``python bot.py``
- WebRTC only: ``python bot.py -t webrtc``
- ESP32: ``python bot.py -t webrtc --esp32 --host 192.168.1.100``
- Daily only: ``python bot.py -t daily``
- Daily (direct, testing only): ``python bot.py -d``
- Telephony: ``python bot.py -t twilio -x your_username.ngrok.io``
- Exotel: ``python bot.py -t exotel`` (no proxy needed, but ngrok connection to HTTP 7860 is required)
- WhatsApp: ``python bot.py --whatsapp``
"""
import argparse
@@ -85,8 +108,10 @@ from pipecat.runner.types import (
DailyRunnerArguments,
RunnerArguments,
SmallWebRTCRunnerArguments,
VonageRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.runner.vonage import configure as configure_vonage
try:
import uvicorn
@@ -186,8 +211,33 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
await bot_module.bot(runner_args)
async def _run_websocket_bot(websocket: WebSocket, args: argparse.Namespace):
"""Run a bot for plain WebSocket transport."""
bot_module = _get_bot_module()
runner_args = WebSocketRunnerArguments(
websocket=websocket,
transport_type="websocket",
session_id=str(uuid.uuid4()),
)
runner_args.cli_args = args
await bot_module.bot(runner_args)
def _setup_websocket_routes(app: FastAPI, args: argparse.Namespace):
"""Set up the plain WebSocket route at ``/ws-client``."""
@app.websocket("/ws-client")
async def websocket_client_endpoint(websocket: WebSocket):
"""Handle plain WebSocket connections (non-telephony)."""
await websocket.accept()
logger.debug("Plain WebSocket connection accepted")
await _run_websocket_bot(websocket, args)
def _configure_server_app(args: argparse.Namespace):
"""Configure the module-level FastAPI app with transport-specific routes."""
"""Configure the module-level FastAPI app with routes for all transports."""
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
@@ -196,17 +246,198 @@ def _configure_server_app(args: argparse.Namespace):
allow_headers=["*"],
)
# Set up transport-specific routes
if args.transport == "webrtc":
_setup_webrtc_routes(app, args)
if args.whatsapp:
_setup_whatsapp_routes(app, args)
elif args.transport == "daily":
_setup_daily_routes(app, args)
elif args.transport in TELEPHONY_TRANSPORTS:
_setup_telephony_routes(app, args)
else:
logger.warning(f"Unknown transport type: {args.transport}")
# Shared session store: session_id -> body data. Used by the WebRTC /start
# flow and the /sessions/{session_id}/... proxy routes.
active_sessions: dict[str, dict[str, Any]] = {}
_setup_frontend_routes(app)
_setup_webrtc_routes(app, args, active_sessions)
_setup_daily_routes(app, args)
_setup_telephony_routes(app, args)
_setup_websocket_routes(app, args)
_setup_unified_start_route(app, args, active_sessions)
if args.whatsapp:
_setup_whatsapp_routes(app, args)
def _setup_unified_start_route(
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
):
"""Register the unified POST /start and GET /status endpoints.
Handles WebRTC, Daily, and telephony transport start flows. Clients specify
which transport they want via the ``transport`` field in the request body.
When ``-t`` was passed on the command line, requests for any other transport
are rejected with HTTP 400.
"""
ALL_TRANSPORTS = ["webrtc", "daily", *TELEPHONY_TRANSPORTS, "websocket"]
@app.get("/status")
async def status():
"""Return the transports supported by this runner instance."""
transports = [args.transport] if args.transport is not None else ALL_TRANSPORTS
return {"status": "ready", "transports": transports}
class IceServer(TypedDict, total=False):
urls: str | list[str]
class IceConfig(TypedDict):
iceServers: list[IceServer]
class StartBotResult(TypedDict, total=False):
sessionId: str
iceConfig: IceConfig | None
dailyRoom: str | None
dailyToken: str | None
wsUrl: str | None
token: str | None
@app.post("/start")
async def start_agent(request: Request):
"""Start a bot session.
Accepts::
{
"transport": "webrtc", // "webrtc" | "daily" | "twilio" | "telnyx" |
// "plivo" | "exotel" — default: "webrtc"
// WebRTC-specific
"enableDefaultIceServers": false,
"body": {...},
// Daily-specific
"createDailyRoom": true,
"dailyRoomProperties": {...},
"dailyMeetingTokenProperties": {...},
"body": {...}
}
"""
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Determine transport: explicit field → legacy Daily hint → CLI default → webrtc
transport = request_data.get("transport")
if transport is None and request_data.get("createDailyRoom", False):
transport = "daily"
if transport is None:
transport = args.transport or "webrtc"
# Enforce restriction when -t was explicitly set on the command line
if args.transport is not None and transport != args.transport:
raise HTTPException(
status_code=400,
detail=(
f"Transport '{transport}' is not allowed. "
f"Server is configured for '{args.transport}' only (-t {args.transport})."
),
)
if transport == "webrtc":
# WebRTC: register the session; the bot starts when the WebRTC offer arrives.
session_id = str(uuid.uuid4())
active_sessions[session_id] = request_data.get("body", {})
result = StartBotResult(
sessionId=session_id,
)
if request_data.get("enableDefaultIceServers"):
result["iceConfig"] = IceConfig(
iceServers=[IceServer(urls=["stun:stun.l.google.com:19302"])]
)
return result
elif transport == "daily":
create_daily_room = request_data.get("createDailyRoom", False)
body = request_data.get("body", {})
daily_room_properties_dict = request_data.get("dailyRoomProperties", None)
daily_token_properties_dict = request_data.get("dailyMeetingTokenProperties", None)
bot_module = _get_bot_module()
existing_room_url = os.getenv("DAILY_ROOM_URL")
session_id = str(uuid.uuid4())
result: StartBotResult | None = None
if create_daily_room or existing_room_url:
from pipecat.runner.daily import configure
from pipecat.transports.daily.utils import (
DailyMeetingTokenProperties,
DailyRoomProperties,
)
async with aiohttp.ClientSession() as session:
room_properties = None
if daily_room_properties_dict:
daily_room_properties_dict.setdefault(
"exp", time.time() + PIPECAT_ROOM_EXP_HOURS * 3600
)
daily_room_properties_dict.setdefault("eject_at_room_exp", True)
try:
room_properties = DailyRoomProperties(**daily_room_properties_dict)
logger.debug(f"Using custom room properties: {room_properties}")
except Exception as e:
logger.error(f"Failed to parse dailyRoomProperties: {e}")
token_properties = None
if daily_token_properties_dict:
try:
token_properties = DailyMeetingTokenProperties(
**daily_token_properties_dict
)
logger.debug(f"Using custom token properties: {token_properties}")
except Exception as e:
logger.error(f"Failed to parse dailyMeetingTokenProperties: {e}")
room_url, token = await configure(
session,
room_exp_duration=PIPECAT_ROOM_EXP_HOURS,
room_properties=room_properties,
token_properties=token_properties,
)
runner_args = DailyRunnerArguments(
room_url=room_url, token=token, body=body, session_id=session_id
)
result = StartBotResult(
dailyRoom=room_url,
dailyToken=token,
sessionId=session_id,
)
else:
runner_args = RunnerArguments(body=body, session_id=session_id)
runner_args.cli_args = args
asyncio.create_task(bot_module.bot(runner_args))
return result
elif transport in TELEPHONY_TRANSPORTS:
# Telephony: the bot starts when the provider connects to /ws.
# Return the WebSocket URL so the caller knows where to point their provider.
scheme = "wss" if args.host != "localhost" else "ws"
return StartBotResult(
wsUrl=f"{scheme}://{args.host}:{args.port}/ws",
)
elif transport == "websocket":
# Plain WebSocket: the bot starts when the client connects to /ws-client.
scheme = "wss" if args.host != "localhost" else "ws"
session_id = str(uuid.uuid4())
return StartBotResult(
wsUrl=f"{scheme}://{args.host}:{args.port}/ws-client",
sessionId=session_id,
token="mock_token",
)
else:
raise HTTPException(
status_code=400,
detail=f"Unknown transport '{transport}'.",
)
def _resolve_download_path(folder: str, filename: str) -> Path:
@@ -220,11 +451,27 @@ def _resolve_download_path(folder: str, filename: str) -> Path:
return file_path
def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
def _setup_frontend_routes(app: FastAPI):
"""Mount the prebuilt frontend UI and root redirect for all transports."""
try:
from pipecat_ai_prebuilt.frontend import PipecatPrebuiltUI
except ImportError as e:
logger.error(f"Prebuilt frontend not available: {e}")
return
app.mount("/client", PipecatPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
def _setup_webrtc_routes(
app: FastAPI, args: argparse.Namespace, active_sessions: dict[str, dict[str, Any]]
):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
IceCandidate,
@@ -236,27 +483,6 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
class IceServer(TypedDict, total=False):
urls: str | list[str]
class IceConfig(TypedDict):
iceServers: list[IceServer]
class StartBotResult(TypedDict, total=False):
sessionId: str
iceConfig: IceConfig | None
# In-memory store of active sessions: session_id -> session info
active_sessions: dict[str, dict[str, Any]] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
@@ -315,29 +541,6 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
await small_webrtc_handler.handle_patch_request(request)
return {"status": "success"}
@app.post("/start")
async def rtvi_start(request: Request):
"""Mimic Pipecat Cloud's /start endpoint."""
# Parse the request body
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
# Store session info immediately in memory, replicate the behavior expected on Pipecat Cloud
session_id = str(uuid.uuid4())
active_sessions[session_id] = request_data.get("body", {})
result: StartBotResult = {"sessionId": session_id}
if request_data.get("enableDefaultIceServers"):
result["iceConfig"] = IceConfig(
iceServers=[IceServer(urls=["stun:stun.l.google.com:19302"])]
)
return result
@app.api_route(
"/sessions/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
@@ -563,12 +766,10 @@ def _setup_whatsapp_routes(app: FastAPI, args: argparse.Namespace):
def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
"""Set up Daily-specific routes."""
@app.get("/")
@app.get("/daily")
async def create_room_and_start_agent():
"""Launch a Daily bot and redirect to room."""
print("Starting bot with Daily transport and redirecting to Daily room")
import aiohttp
logger.debug("Starting bot with Daily transport and redirecting to Daily room")
from pipecat.runner.daily import configure
@@ -584,105 +785,6 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
asyncio.create_task(bot_module.bot(runner_args))
return RedirectResponse(room_url)
@app.post("/start")
async def start_agent(request: Request):
"""Handler for /start endpoints.
Expects POST body like::
{
"createDailyRoom": true,
"dailyRoomProperties": { "start_video_off": true },
"dailyMeetingTokenProperties": { "is_owner": true, "user_name": "Bot" },
"body": { "custom_data": "value" }
}
"""
print("Starting bot with Daily transport")
# Parse the request body
try:
request_data = await request.json()
logger.debug(f"Received request: {request_data}")
except Exception as e:
logger.error(f"Failed to parse request body: {e}")
request_data = {}
create_daily_room = request_data.get("createDailyRoom", False)
body = request_data.get("body", {})
daily_room_properties_dict = request_data.get("dailyRoomProperties", None)
daily_token_properties_dict = request_data.get("dailyMeetingTokenProperties", None)
bot_module = _get_bot_module()
existing_room_url = os.getenv("DAILY_ROOM_URL")
session_id = str(uuid.uuid4())
result = None
# Configure room if:
# 1. Explicitly requested via createDailyRoom in payload
# 2. Using pre-configured room from DAILY_ROOM_URL env var
if create_daily_room or existing_room_url:
import aiohttp
from pipecat.runner.daily import configure
from pipecat.transports.daily.utils import (
DailyMeetingTokenProperties,
DailyRoomProperties,
)
async with aiohttp.ClientSession() as session:
# Parse dailyRoomProperties if provided
room_properties = None
if daily_room_properties_dict:
# Apply Pipecat Cloud's session policy if caller didn't override.
daily_room_properties_dict.setdefault(
"exp", time.time() + PIPECAT_ROOM_EXP_HOURS * 3600
)
daily_room_properties_dict.setdefault("eject_at_room_exp", True)
try:
room_properties = DailyRoomProperties(**daily_room_properties_dict)
logger.debug(f"Using custom room properties: {room_properties}")
except Exception as e:
logger.error(f"Failed to parse dailyRoomProperties: {e}")
# Continue without custom properties
# Parse dailyMeetingTokenProperties if provided
token_properties = None
if daily_token_properties_dict:
try:
token_properties = DailyMeetingTokenProperties(
**daily_token_properties_dict
)
logger.debug(f"Using custom token properties: {token_properties}")
except Exception as e:
logger.error(f"Failed to parse dailyMeetingTokenProperties: {e}")
# Continue without custom properties
room_url, token = await configure(
session,
room_exp_duration=PIPECAT_ROOM_EXP_HOURS,
room_properties=room_properties,
token_properties=token_properties,
)
runner_args = DailyRunnerArguments(
room_url=room_url, token=token, body=body, session_id=session_id
)
result = {
"dailyRoom": room_url,
"dailyToken": token,
"sessionId": session_id,
}
else:
runner_args = RunnerArguments(body=body, session_id=session_id)
# Update CLI args.
runner_args.cli_args = args
# Start the bot in the background
asyncio.create_task(bot_module.bot(runner_args))
return result
if args.dialin:
@app.post("/daily-dialin-webhook")
@@ -731,8 +833,6 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
detail="Missing required fields: From, To, callId, callDomain",
)
import aiohttp
from pipecat.runner.daily import configure
from pipecat.runner.types import DailyDialinRequest, DialinSettings
@@ -801,44 +901,51 @@ def _setup_daily_routes(app: FastAPI, args: argparse.Namespace):
def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
"""Set up telephony-specific routes."""
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
"""Set up telephony-specific routes.
The WebSocket endpoint (``/ws``) is always registered so providers can
connect directly. The XML webhook (``POST /``) is only registered when a
specific telephony transport is chosen via ``-t`` because the XML template
is provider-specific and requires a proxy hostname (``--proxy``).
"""
if args.transport in TELEPHONY_TRANSPORTS:
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
"twilio": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{args.proxy}/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
"telnyx": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{args.proxy}/ws" bidirectionalMode="rtp"></Stream>
</Connect>
<Pause length="40"/>
</Response>""",
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
"plivo": f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{args.proxy}/ws</Stream>
</Response>""",
}
}
@app.post("/")
async def start_call():
"""Handle telephony webhook and return XML response."""
if args.transport == "exotel":
# Exotel doesn't use POST webhooks - redirect to proper documentation
logger.debug("POST Exotel endpoint - not used")
return {
"error": "Exotel doesn't use POST webhooks",
"websocket_url": f"wss://{args.proxy}/ws",
"note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet",
}
else:
logger.debug(f"POST {args.transport.upper()} XML")
xml_content = XML_TEMPLATES.get(args.transport, "<Response></Response>")
return HTMLResponse(content=xml_content, media_type="application/xml")
@app.post("/")
async def start_call():
"""Handle telephony webhook and return XML response."""
if args.transport == "exotel":
# Exotel doesn't use POST webhooks - redirect to proper documentation
logger.debug("POST Exotel endpoint - not used")
return {
"error": "Exotel doesn't use POST webhooks",
"websocket_url": f"wss://{args.proxy}/ws",
"note": "Configure the WebSocket URL above in your Exotel App Bazaar Voicebot Applet",
}
else:
logger.debug(f"POST {args.transport.upper()} XML")
xml_content = XML_TEMPLATES.get(args.transport, "<Response></Response>")
return HTMLResponse(content=xml_content, media_type="application/xml")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
@@ -847,11 +954,6 @@ def _setup_telephony_routes(app: FastAPI, args: argparse.Namespace):
logger.debug("WebSocket connection accepted")
await _run_telephony_bot(websocket, args)
@app.get("/")
async def start_agent():
"""Simple status endpoint for telephony transports."""
return {"status": f"Bot started with {args.transport}"}
async def _run_daily_direct(args: argparse.Namespace):
"""Run Daily bot with direct connection (no FastAPI server)."""
@@ -883,6 +985,25 @@ async def _run_daily_direct(args: argparse.Namespace):
await bot_module.bot(runner_args)
async def _run_vonage():
"""Run Vonage bot (no FastAPI server)."""
logger.info("Running Vonage transport...")
application_id, session_id, token = await configure_vonage()
runner_args = VonageRunnerArguments(
application_id=application_id, vonage_session_id=session_id, token=token
)
runner_args.handle_sigint = True
# Get the bot module and run it directly
bot_module = _get_bot_module()
print(f"Joining Vonage session: {runner_args.vonage_session_id}")
print()
await bot_module.bot(runner_args)
def _validate_and_clean_proxy(proxy: str) -> str:
"""Validate and clean proxy hostname, removing protocol if present."""
if not proxy:
@@ -922,22 +1043,27 @@ def runner_port() -> int:
def main(parser: argparse.ArgumentParser | None = None):
"""Start the Pipecat development runner.
Parses command-line arguments and starts a FastAPI server configured
for the specified transport type.
Parses command-line arguments and starts a FastAPI server that supports
WebRTC, Daily, and telephony transports simultaneously. Clients declare
which transport to use via the ``transport`` field in the ``/start`` body.
When ``-t`` is provided, the server restricts ``/start`` to that transport
only and displays transport-specific startup information.
The runner discovers and runs any ``bot(runner_args)`` function found in the
calling module.
Command-line arguments:
- --host: Server host address (default: localhost) 879
- --host: Server host address (default: localhost)
- --port: Server port (default: 7860)
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
- -t/--transport: Restrict to a single transport and set as default for /start
(daily, webrtc, twilio, telnyx, plivo, exotel). Omit to support all transports.
- -x/--proxy: Public proxy hostname for telephony webhooks
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
- -f/--folder: Path to downloads folder
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
- --dialin: Enable Daily PSTN dial-in webhook handling
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
- --whatsapp: Ensure requried WhatsApp environment variables are present
- --whatsapp: Ensure required WhatsApp environment variables are present
- -v/--verbose: Increase logging verbosity
Args:
@@ -957,9 +1083,12 @@ def main(parser: argparse.ArgumentParser | None = None):
"-t",
"--transport",
type=str,
choices=["daily", "webrtc", *TELEPHONY_TRANSPORTS],
default="webrtc",
help="Transport type",
choices=["daily", "vonage", "webrtc", *TELEPHONY_TRANSPORTS],
default=None,
help=(
"Restrict the server to a single transport and set it as the default for /start. "
"Omit to support all transports simultaneously (default behaviour)."
),
)
parser.add_argument("-x", "--proxy", help="Public proxy host name")
parser.add_argument(
@@ -977,7 +1106,7 @@ def main(parser: argparse.ArgumentParser | None = None):
"--dialin",
action="store_true",
default=False,
help="Enable Daily PSTN dial-in webhook handling (requires Daily transport)",
help="Enable Daily PSTN dial-in webhook handling",
)
parser.add_argument(
"--esp32",
@@ -989,7 +1118,7 @@ def main(parser: argparse.ArgumentParser | None = None):
"--whatsapp",
action="store_true",
default=False,
help="Ensure requried WhatsApp environment variables are present",
help="Ensure required WhatsApp environment variables are present",
)
args = parser.parse_args()
@@ -998,12 +1127,13 @@ def main(parser: argparse.ArgumentParser | None = None):
if args.proxy:
args.proxy = _validate_and_clean_proxy(args.proxy)
# Auto-set transport to daily if --direct is used without explicit transport
if args.direct and args.transport == "webrtc": # webrtc is the default
args.transport = "daily"
elif args.direct and args.transport != "daily":
logger.error("--direct flag only works with Daily transport (-t daily)")
return
# --direct implies Daily transport
if args.direct:
if args.transport is None or args.transport == "daily":
args.transport = "daily"
else:
logger.error("--direct flag only works with Daily transport (-t daily)")
return
# Validate ESP32 requirements
if args.esp32 and args.host == "localhost":
@@ -1011,7 +1141,7 @@ def main(parser: argparse.ArgumentParser | None = None):
return
# Validate dial-in requirements
if args.dialin and args.transport != "daily":
if args.dialin and args.transport is not None and args.transport != "daily":
logger.error("--dialin flag only works with Daily transport (-t daily)")
return
@@ -1029,28 +1159,44 @@ def main(parser: argparse.ArgumentParser | None = None):
asyncio.run(_run_daily_direct(args))
return
# Print startup message for server-based transports
if args.transport == "webrtc":
print()
# Print startup message
print()
if args.transport is None:
print("🚀 Bot ready!")
print(f" → WebRTC: http://{args.host}:{args.port}/client")
print(f" → Daily: http://{args.host}:{args.port}/daily")
print(f" → Telephony: ws://{args.host}:{args.port}/ws")
elif args.transport == "webrtc":
if args.esp32:
print(f"🚀 Bot ready! (ESP32 mode)")
print("🚀 Bot ready! (ESP32 mode)")
elif args.whatsapp:
print(f"🚀 Bot ready! (WhatsApp)")
print("🚀 Bot ready! (WhatsApp)")
else:
print(f"🚀 Bot ready!")
print("🚀 Bot ready! (WebRTC)")
print(f" → Open http://{args.host}:{args.port}/client in your browser")
print()
elif args.transport == "daily":
print()
print(f"🚀 Bot ready!")
print("🚀 Bot ready! (Daily)")
if args.dialin:
print(
f" → Daily dial-in webhook: http://{args.host}:{args.port}/daily-dialin-webhook"
)
print(f" → Configure this URL in your Daily phone number settings")
else:
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
print(
f" → Open http://{args.host}:{args.port}/daily in your browser to start a session"
)
elif args.transport in TELEPHONY_TRANSPORTS:
print(f"🚀 Bot ready! ({args.transport.capitalize()})")
if args.proxy:
print(f" → XML webhook: http://{args.host}:{args.port}/")
print(f" → WebSocket: ws://{args.host}:{args.port}/ws")
elif args.transport == "vonage":
print()
print(f"🚀 Bot ready!")
asyncio.run(_run_vonage())
print()
return
print()
RUNNER_DOWNLOADS_FOLDER = args.folder
RUNNER_HOST = args.host

View File

@@ -99,16 +99,35 @@ class DailyRunnerArguments(RunnerArguments):
token: str | None = None
@dataclass
class VonageRunnerArguments(RunnerArguments):
"""Vonage transport session arguments for the runner.
Parameters:
application_id: Vonage application ID
vonage_session_id: Vonage session ID
token: Vonage Session Token
"""
application_id: str
vonage_session_id: str
token: str
@dataclass
class WebSocketRunnerArguments(RunnerArguments):
"""WebSocket transport session arguments for the runner.
Parameters:
websocket: WebSocket connection for audio streaming
transport_type: Transport type identifier. Set to ``"websocket"`` for plain
WebSocket connections; ``None`` triggers auto-detection from the first
telephony provider message.
body: Additional request data
"""
websocket: WebSocket
transport_type: str | None = None
@dataclass

View File

@@ -33,7 +33,7 @@ import json
import os
import re
from collections.abc import Callable
from typing import Any
from typing import Any, cast
from fastapi import WebSocket
from loguru import logger
@@ -42,9 +42,10 @@ from pipecat.runner.types import (
DailyRunnerArguments,
LiveKitRunnerArguments,
SmallWebRTCRunnerArguments,
VonageRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.base_transport import BaseTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
def _detect_transport_type_from_message(message_data: dict) -> str:
@@ -271,6 +272,14 @@ def get_transport_client_id(transport: BaseTransport, client: Any) -> str:
except ImportError:
pass
try:
from pipecat.transports.vonage.video_connector import VonageVideoConnectorTransport
if isinstance(transport, VonageVideoConnectorTransport):
return client["streamId"]
except ImportError:
pass
logger.warning(f"Unable to get client id from unsupported transport {type(transport)}")
return ""
@@ -303,6 +312,24 @@ async def maybe_capture_participant_camera(
except ImportError:
pass
try:
from pipecat.transports.vonage.video_connector import (
SubscribeSettings,
VonageVideoConnectorTransport,
)
if isinstance(transport, VonageVideoConnectorTransport):
await transport.subscribe_to_stream(
client["streamId"],
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_framerate=framerate if framerate != 0 else None,
),
)
except ImportError:
pass
async def maybe_capture_participant_screen(
transport: BaseTransport, client: Any, framerate: int = 0
@@ -534,6 +561,10 @@ async def create_transport(
audio_out_enabled=True,
# add_wav_header and serializer will be set automatically
),
"vonage": lambda: VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True
),
}
transport = await create_transport(runner_args, transport_params)
@@ -562,6 +593,12 @@ async def create_transport(
)
elif isinstance(runner_args, WebSocketRunnerArguments):
if runner_args.transport_type == "websocket":
params = _get_transport_params("websocket", transport_params)
from pipecat.transports.websocket.fastapi import FastAPIWebsocketTransport
return FastAPIWebsocketTransport(websocket=runner_args.websocket, params=params)
# Parse once to determine the provider and get data
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
params = _get_transport_params(transport_type, transport_params)
@@ -581,6 +618,31 @@ async def create_transport(
runner_args.room_name,
params=params,
)
elif isinstance(runner_args, VonageRunnerArguments):
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
try:
params = cast(
VonageVideoConnectorTransportParams,
_get_transport_params("vonage", transport_params),
)
except ValueError:
webrtc_params: TransportParams = cast(
TransportParams, _get_transport_params("webrtc", transport_params)
)
params = VonageVideoConnectorTransportParams(
**webrtc_params.model_dump(),
video_in_auto_subscribe=True,
)
return VonageVideoConnectorTransport(
runner_args.application_id,
runner_args.vonage_session_id,
runner_args.token,
params=params,
)
else:
raise ValueError(f"Unsupported runner arguments type: {type(runner_args)}")

View File

@@ -0,0 +1,52 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Vonage session configuration utilities.
This module extracts the necessary parameters to connect to a Vonage Video session.
Required environment variables:
- VONAGE_APPLICATION_ID - Vonage application ID
- VONAGE_SESSION_ID - Vonage session ID
- VONAGE_TOKEN - Vonage token
Example:
from pipecat.runner.vonage import configure
application_id, session_id, token = await configure()
"""
import os
async def configure() -> tuple[str, str, str]:
"""Configure Vonage application ID, session ID and token from environment.
Returns:
Tuple containing the server application_id, session_id and token.
Raises:
Exception: If required Vonage configuration is not provided.
"""
application_id = os.getenv("VONAGE_APPLICATION_ID")
session_id = os.getenv("VONAGE_SESSION_ID")
token = os.getenv("VONAGE_TOKEN")
if not application_id:
raise Exception(
"No Vonage application ID specified. Use set VONAGE_APPLICATION_ID in your environment."
)
if not session_id:
raise Exception(
"No Vonage Session ID specified. Use set VONAGE_SESSION_ID in your environment."
)
if not token:
raise Exception("No Vonage token specified. Use set VONAGE_TOKEN in your environment.")
return (application_id, session_id, token)

View File

@@ -48,7 +48,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Anthropic, you need to `pip install pipecat-ai[anthropic]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class AnthropicThinkingConfig(BaseModel):

View File

@@ -55,7 +55,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use AssemblyAI, you need to `pip install "pipecat-ai[assemblyai]"`.')
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def map_language_from_assemblyai(language_code: str) -> Language:

View File

@@ -39,7 +39,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Async, you need to `pip install pipecat-ai[asyncai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_async_language(language: Language) -> str:

View File

@@ -49,7 +49,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use AWS services, you need to `pip install pipecat-ai[aws]`. Also, remember to set `AWS_SECRET_ACCESS_KEY`, `AWS_ACCESS_KEY_ID`, and `AWS_REGION` environment variable."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -81,7 +81,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use AWS services, you need to `pip install pipecat-ai[aws-nova-sonic]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class AWSNovaSonicUnhandledFunctionException(Exception):

View File

@@ -32,7 +32,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use SageMaker BiDi client, you need to `pip install pipecat-ai[sagemaker]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class SageMakerBidiClient:

View File

@@ -48,7 +48,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use AWS services, you need to `pip install pipecat-ai[aws]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -34,7 +34,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use AWS services, you need to `pip install pipecat-ai[aws]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_aws_language(language: Language) -> str:

View File

@@ -17,7 +17,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Azure Realtime, you need to `pip install pipecat-ai[openai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -49,7 +49,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -42,7 +42,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Azure, you need to `pip install pipecat-ai[azure]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:

View File

@@ -42,7 +42,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -39,7 +39,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class GenerationConfig(BaseModel):

View File

@@ -31,7 +31,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Re-export for backward compatibility
__all__ = [

View File

@@ -48,7 +48,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class LiveOptions:

View File

@@ -39,7 +39,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use DeepgramWebsocketTTSService, you need to `pip install pipecat-ai[deepgram]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -52,7 +52,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use ElevenLabs Realtime STT, you need to `pip install pipecat-ai[elevenlabs]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_elevenlabs_language(language: Language) -> str:
@@ -358,7 +358,8 @@ class ElevenLabsSTTService(SegmentedSTTService):
# Add required model_id and language_code
data.add_field("model_id", self._settings.model)
data.add_field("language_code", self._settings.language)
if self._settings.language:
data.add_field("language_code", self._settings.language)
if self._settings.tag_audio_events is not None:
data.add_field("tag_audio_events", str(self._settings.tag_audio_events).lower())
keyterms = self._settings.keyterms

View File

@@ -56,7 +56,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Models that support language codes
# The following models are excluded as they don't support language codes:

View File

@@ -38,7 +38,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Fish Audio, you need to `pip install pipecat-ai[fish]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# FishAudio supports various output formats
FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"]

View File

@@ -53,7 +53,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Gladia, you need to `pip install pipecat-ai[gladia]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_gladia_language(language: Language) -> str:

View File

@@ -105,7 +105,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Connection management constants

View File

@@ -36,7 +36,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google Vertex AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -35,7 +35,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -65,7 +65,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class GoogleThinkingConfig(BaseModel):

View File

@@ -57,7 +57,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_APPLICATION_CREDENTIALS` environment variable."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_google_stt_language(language: Language) -> str:

View File

@@ -57,7 +57,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_APPLICATION_CREDENTIALS` environment variable."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_google_tts_language(language: Language) -> str:

View File

@@ -35,7 +35,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Google AI, you need to `pip install pipecat-ai[google]`. Also, set `GOOGLE_APPLICATION_CREDENTIALS` environment variable."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -44,7 +44,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use Gradium, you need to `pip install "pipecat-ai[gradium]"`.')
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Seconds to wait after a "flushed" message for trailing text tokens to arrive
# before finalizing the transcription.

View File

@@ -33,7 +33,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Gradium, you need to `pip install pipecat-ai[gradium]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
SAMPLE_RATE = 48000

View File

@@ -30,7 +30,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Groq, you need to `pip install pipecat-ai[groq]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Hint set for `output_format`. The values mirror the Literal that
# `groq.resources.audio.speech.AsyncSpeech.create` accepts on its

View File

@@ -46,7 +46,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use HeyGen, you need to `pip install pipecat-ai[heygen]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
HEY_GEN_SAMPLE_RATE = 24000

View File

@@ -38,7 +38,7 @@ try:
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
logger.error(f"Exception: {e}")
logger.error("In order to use Hume, you need to `pip install pipecat-ai[hume]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz

View File

@@ -68,7 +68,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Inworld Realtime, you need to `pip install pipecat-ai[inworld]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -43,7 +43,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Inworld WebSocket TTS, you need to `pip install websockets`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
from pipecat.frames.frames import (
AggregationType,

View File

@@ -32,7 +32,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Kokoro, you need to `pip install pipecat-ai[kokoro]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
KOKORO_CACHE_DIR = Path(os.path.expanduser("~/.cache/kokoro-onnx"))
KOKORO_MODEL_URL = "https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/kokoro-v1.0.onnx"

View File

@@ -34,7 +34,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_lmnt_language(language: Language) -> str:

View File

@@ -29,7 +29,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
ServerParameters: TypeAlias = StdioServerParameters | SseServerParameters | StreamableHttpParameters

View File

@@ -28,7 +28,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class Mem0MemoryService(FrameProcessor):

View File

@@ -48,7 +48,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Mistral STT, you need to `pip install pipecat-ai[mistral]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -31,7 +31,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Mistral TTS, you need to `pip install pipecat-ai[mistral]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -34,7 +34,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Moondream, you need to `pip install pipecat-ai[moondream]`.")
raise Exception(f"Missing module(s): {e}")
raise ImportError(f"Missing module(s): {e}") from e
def detect_device():

View File

@@ -41,7 +41,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Neuphonic, you need to `pip install pipecat-ai[neuphonic]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_neuphonic_lang_code(language: Language) -> str:

View File

@@ -46,7 +46,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use NVIDIA Nemotron Speech STT, you need to `pip install pipecat-ai[nvidia]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_nvidia_nemotron_speech_language(language: Language) -> str:

View File

@@ -55,7 +55,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use NVIDIA Nemotron Speech TTS, you need to `pip install pipecat-ai[nvidia]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -71,7 +71,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -59,7 +59,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use OpenAI, you need to `pip install pipecat-ai[openai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# ---------------------------------------------------------------------------

View File

@@ -30,7 +30,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Piper, you need to `pip install pipecat-ai[piper]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -33,7 +33,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Resemble AI, you need to `pip install pipecat-ai[resembleai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -47,7 +47,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Rime, you need to `pip install pipecat-ai[rime]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_rime_language(language: Language) -> str:

View File

@@ -53,7 +53,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sarvam, you need to `pip install pipecat-ai[sarvam]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_sarvam_language(language: Language) -> str:

View File

@@ -70,7 +70,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sarvam, you need to `pip install pipecat-ai[sarvam]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class SarvamTTSModel(StrEnum):

View File

@@ -35,7 +35,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Simli, you need to `pip install pipecat-ai[simli]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -48,7 +48,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_smallest_stt_language(language: Language) -> str:

View File

@@ -41,7 +41,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Smallest, you need to `pip install pipecat-ai[smallest]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class SmallestTTSModel(StrEnum):

View File

@@ -39,7 +39,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
KEEPALIVE_MESSAGE = '{"type": "keepalive"}'

View File

@@ -44,7 +44,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Soniox, you need to `pip install pipecat-ai[soniox]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Soniox idle timeout is 20-30s; keepalive cadence must stay well inside it.

View File

@@ -61,7 +61,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
load_dotenv()

View File

@@ -32,7 +32,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use Speechmatics, you need to `pip install pipecat-ai[speechmatics]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -57,7 +57,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Ultravox, you need to `pip install pipecat-ai[ultravox]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Result shipped as the client_tool_result when we see an async-tool

View File

@@ -33,14 +33,14 @@ if TYPE_CHECKING:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Whisper, you need to `pip install pipecat-ai[whisper]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
try:
import mlx_whisper # noqa: F401
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Whisper, you need to `pip install pipecat-ai[mlx-whisper]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class Model(Enum):

View File

@@ -67,7 +67,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Grok Realtime, you need to `pip install pipecat-ai[grok]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
@dataclass

View File

@@ -41,7 +41,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error('In order to use xAI STT, you need to `pip install "pipecat-ai[xai]"`.')
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_xai_stt_language(language: Language) -> str:

View File

@@ -46,7 +46,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use XAITTSService, you need to `pip install pipecat-ai[xai]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
def language_to_xai_language(language: Language) -> str:

View File

@@ -75,7 +75,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use the Daily transport, you need to `pip install pipecat-ai[daily]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
VAD_RESET_PERIOD_MS = 2000

View File

@@ -52,7 +52,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use LiveKit, you need to `pip install pipecat-ai[livekit]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# DTMF mapping according to RFC 4733
DTMF_CODE_MAP = {

View File

@@ -28,7 +28,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use local audio, you need to `pip install pipecat-ai[local]`. On MacOS, you also need to `brew install portaudio`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class LocalAudioTransportParams(TransportParams):

View File

@@ -34,14 +34,14 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use local audio, you need to `pip install pipecat-ai[local]`. On MacOS, you also need to `brew install portaudio`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
try:
import tkinter as tk
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("tkinter missing. Try `apt install python3-tk` or `brew install python-tk@3.10`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class TkTransportParams(TransportParams):

View File

@@ -36,7 +36,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
# Clamp aiortc's SCTP DATA-chunk payload size so the on-wire UDP packet fits
# inside the smallest-MTU path we're likely to see (IPv6 minimum 1280,

View File

@@ -52,7 +52,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
CAM_VIDEO_SOURCE = "camera"
SCREEN_VIDEO_SOURCE = "screenVideo"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,150 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Vonage Video Connector utils."""
from dataclasses import dataclass, replace
from enum import StrEnum
import numpy as np
import numpy.typing as npt
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
@dataclass
class AudioProps:
"""Audio properties for normalization.
Parameters:
sample_rate: The sample rate of the audio.
is_stereo: Whether the audio is stereo (True) or mono (False).
"""
sample_rate: int
is_stereo: bool
class ImageFormat(StrEnum):
"""Enum for image formats."""
PLANAR_YUV420 = "PLANAR_YUV420"
PACKED_YUV444 = "PACKED_YUV444"
RGB = "RGB"
RGBA = "RGBA"
BGR = "BGR"
BGRA = "BGRA"
def check_audio_data(
buffer: bytes | memoryview, number_of_frames: int, number_of_channels: int
) -> None:
"""Check the audio sample width based on buffer size, number of frames and channels."""
if number_of_channels not in (1, 2):
raise ValueError(f"We only accept mono or stereo audio, got {number_of_channels}")
if isinstance(buffer, memoryview):
bytes_per_sample = buffer.itemsize
else:
bytes_per_sample = len(buffer) // (number_of_frames * number_of_channels)
if bytes_per_sample != 2:
raise ValueError(f"We only accept 16 bit PCM audio, got {bytes_per_sample * 8} bit")
def process_audio_channels(
audio: npt.NDArray[np.int16], current: AudioProps, target: AudioProps
) -> npt.NDArray[np.int16]:
"""Normalize audio channels to the target properties."""
if current.is_stereo != target.is_stereo:
if target.is_stereo:
audio = np.repeat(audio, 2)
else:
audio = audio.reshape(-1, 2).mean(axis=1).astype(np.int16)
return audio
async def process_audio(
resampler: BaseAudioResampler,
audio: npt.NDArray[np.int16],
current: AudioProps,
target: AudioProps,
) -> npt.NDArray[np.int16]:
"""Normalize audio to the target properties."""
res_audio = audio
if current.sample_rate != target.sample_rate:
# first normalize channels to mono if needed, then resample, then normalize channels to target
res_audio = process_audio_channels(res_audio, current, replace(current, is_stereo=False))
current = replace(current, is_stereo=False)
res_audio_bytes: bytes = await resampler.resample(
res_audio.tobytes(), current.sample_rate, target.sample_rate
)
res_audio = np.frombuffer(res_audio_bytes, dtype=np.int16)
res_audio = process_audio_channels(res_audio, current, target)
return res_audio
def image_colorspace_conversion(
image: bytes, size: tuple[int, int], from_format: ImageFormat, to_format: ImageFormat
) -> bytes | None:
"""Convert image colorspace from one format to another."""
match (from_format, to_format):
case (fmt1, fmt2) if fmt1 == fmt2:
return image
case (ImageFormat.RGB, ImageFormat.BGR) | (ImageFormat.BGR, ImageFormat.RGB):
np_input = np.frombuffer(image, dtype=np.uint8)
np_output = np_input.reshape(size[1], size[0], 3)[:, :, ::-1]
return np_output.tobytes()
case (ImageFormat.RGBA, ImageFormat.BGRA) | (ImageFormat.BGRA, ImageFormat.RGBA):
np_input = np.frombuffer(image, dtype=np.uint8)
np_output = np_input.reshape(size[1], size[0], 4)[:, :, [2, 1, 0, 3]]
return np_output.tobytes()
case (ImageFormat.PLANAR_YUV420, ImageFormat.PACKED_YUV444):
# YUV420 (I420) has Y plane of size width*height, U and V planes of size (width/2)*(height/2)
# Packed YUV444 interleaves Y, U, V values for each pixel (YUVYUVYUV...)
width, height = size
y_plane_size = width * height
uv_plane_size_420 = (width // 2) * (height // 2)
np_input = np.frombuffer(image, dtype=np.uint8)
y_plane = np_input[:y_plane_size].reshape(height, width)
u_plane_420 = np_input[y_plane_size : y_plane_size + uv_plane_size_420].reshape(
height // 2, width // 2
)
v_plane_420 = np_input[
y_plane_size + uv_plane_size_420 : y_plane_size + 2 * uv_plane_size_420
].reshape(height // 2, width // 2)
# Upsample U and V planes by repeating each pixel in 2x2 blocks
u_plane_444 = np.repeat(np.repeat(u_plane_420, 2, axis=0), 2, axis=1)
v_plane_444 = np.repeat(np.repeat(v_plane_420, 2, axis=0), 2, axis=1)
# Interleave Y, U, V values for packed format (YUVYUVYUV...)
np_output = np.stack([y_plane, u_plane_444, v_plane_444], axis=-1)
return np_output.tobytes()
case (ImageFormat.PACKED_YUV444, ImageFormat.PLANAR_YUV420):
# Packed YUV444 has Y, U, V interleaved (YUVYUVYUV...)
# YUV420 (I420) has Y plane of size width*height, U and V planes of size (width/2)*(height/2)
width, height = size
np_input = np.frombuffer(image, dtype=np.uint8).reshape(height, width, 3)
y_plane = np_input[:, :, 0].reshape(height, width)
u_plane_444 = np_input[:, :, 1]
v_plane_444 = np_input[:, :, 2]
# Downsample U and V planes by taking every other pixel (2x2 -> 1 averaging)
u_plane_420 = u_plane_444[::2, ::2].reshape(height // 2, width // 2)
v_plane_420 = v_plane_444[::2, ::2].reshape(height // 2, width // 2)
# Concatenate Y, U, V planes
np_output = np.concatenate(
[y_plane.flatten(), u_plane_420.flatten(), v_plane_420.flatten()]
)
return np_output.tobytes()
case _:
return None

View File

@@ -0,0 +1,483 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Vonage Video Connector transport."""
from typing import Optional
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
StartFrame,
UserImageRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport
from pipecat.transports.vonage.client import (
Session, # type: ignore[attr-defined]
Stream, # type: ignore[attr-defined]
Subscriber, # type: ignore[attr-defined]
VonageClient,
VonageClientListener,
)
# the following "as" imports help to re-export these types and avoid type checking warnings
# when importing these types from the main transport module
from pipecat.transports.vonage.client import (
SubscribeSettings as SubscribeSettings,
)
from pipecat.transports.vonage.client import (
VonageException as VonageException,
)
from pipecat.transports.vonage.client import (
VonageVideoConnectorTransportParams as VonageVideoConnectorTransportParams,
)
class VonageVideoConnectorInputTransport(BaseInputTransport):
"""Input transport for Vonage, handling audio input from the Vonage session.
Receives audio from a Vonage Video session and pushes it as input frames.
"""
_params: VonageVideoConnectorTransportParams
def __init__(self, client: VonageClient, params: VonageVideoConnectorTransportParams):
"""Initialize the Vonage input transport.
Args:
client: The VonageClient instance to use.
params: Transport parameters for input configuration.
"""
super().__init__(params)
self._initialized: bool = False
self._client: VonageClient = client
self._listener_id: int = -1
self._connected: bool = False
async def start(self, frame: StartFrame) -> None:
"""Start the Vonage input transport.
Args:
frame: The StartFrame to initiate the transport.
"""
await super().start(frame)
if self._initialized:
return
self._initialized = True
if self._params.audio_in_enabled or self._params.video_in_enabled:
self._listener_id = self._client.add_listener(
VonageClientListener(
on_audio_in=self._audio_in_cb,
on_video_in=self._video_in_cb,
on_error=self._on_error_cb,
)
)
try:
await self._client.connect(frame)
self._connected = True
except Exception as exc:
logger.error(f"Error connecting to Vonage session: {exc}")
await self.push_error("Vonage video connector connection error", fatal=True)
return
await self.set_transport_ready(frame)
async def setup(self, setup: FrameProcessorSetup) -> None:
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
await self._client.setup(setup)
async def cleanup(self) -> None:
"""Cleanup input transport."""
await super().cleanup() # type: ignore
await self._client.cleanup()
async def _audio_in_cb(self, _session: Session, audio: InputAudioRawFrame) -> None:
if self._connected and self._params.audio_in_enabled:
await self.push_audio_frame(audio)
async def _video_in_cb(self, _subscriber: Subscriber, video: UserImageRawFrame) -> None:
if self._connected and self._params.video_in_enabled:
await self.push_video_frame(video)
async def _on_error_cb(self, session: Session, description: str, code: int) -> None:
logger.error(
f"Vonage input transport error session={session.id} code={code} description={description}"
)
if self._connected:
await self.push_error("Vonage video connector error", fatal=True)
async def stop(self, frame: EndFrame) -> None:
"""Stop the Vonage input transport.
Args:
frame: The EndFrame to stop the transport.
"""
await super().stop(frame)
await self._stop_client()
async def cancel(self, frame: CancelFrame) -> None:
"""Cancel the Vonage input transport.
Args:
frame: The CancelFrame to cancel the transport.
"""
await super().cancel(frame)
await self._stop_client()
async def _stop_client(self) -> None:
if self._connected:
self._client.remove_listener(self._listener_id)
self._connected = False
try:
await self._client.disconnect()
except Exception:
pass
async def subscribe_to_stream(self, stream_id: str, params: SubscribeSettings) -> None:
"""Subscribe to a participant's stream.
Args:
stream_id: The ID of the participant to subscribe to.
params: Subscription parameters for the subscription.
"""
await self._client.subscribe_to_stream(stream_id, params)
class VonageVideoConnectorOutputTransport(BaseOutputTransport):
"""Output transport for Vonage, handling audio output to the Vonage session.
Sends audio frames to a Vonage Video session as output.
"""
_params: VonageVideoConnectorTransportParams
def __init__(self, client: VonageClient, params: VonageVideoConnectorTransportParams):
"""Initialize the Vonage output transport.
Args:
client: The VonageClient instance to use.
params: Transport parameters for output configuration.
"""
super().__init__(params)
self._initialized: bool = False
self._client = client
self._connected: bool = False
self._listener_id: int = -1
async def start(self, frame: StartFrame) -> None:
"""Start the Vonage output transport.
Args:
frame: The StartFrame to initiate the transport.
"""
await super().start(frame)
if self._initialized:
return
self._initialized = True
if self._params.audio_out_enabled or self._params.video_out_enabled:
self._listener_id = self._client.add_listener(
VonageClientListener(on_error=self._on_error_cb)
)
try:
await self._client.connect(frame)
self._connected = True
except Exception as exc:
logger.error(f"Error connecting to Vonage session: {exc}")
await self.push_error("Vonage video connector connection error", fatal=True)
return
await self.set_transport_ready(frame)
async def setup(self, setup: FrameProcessorSetup) -> None:
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup)
await self._client.setup(setup)
async def cleanup(self) -> None:
"""Cleanup output transport."""
await super().cleanup() # type: ignore
await self._client.cleanup()
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process a frame for the Vonage output transport.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# if we get an interruption frame, we need to ensure the buffers inside Vonage Video Connector are cleared
if (
self._connected
and isinstance(frame, InterruptionFrame)
and self._params.clear_buffers_on_interruption
):
logger.info("Clearing Vonage media buffers due to interruption frame")
self._client.clear_media_buffers()
async def write_audio_frame(self, frame: OutputAudioRawFrame) -> bool:
"""Write an audio frame to the Vonage session.
Args:
frame: The OutputAudioRawFrame to send.
"""
result = False
if self._connected and self._params.audio_out_enabled:
result = await self._client.write_audio(frame)
return result
async def write_video_frame(self, frame: OutputImageRawFrame) -> bool:
"""Write a video frame to the transport.
Args:
frame: The output video frame to write.
"""
result = False
if self._connected and self._params.video_out_enabled:
result = await self._client.write_video(frame)
return result
async def stop(self, frame: EndFrame) -> None:
"""Stop the Vonage output transport.
Args:
frame: The EndFrame to stop the transport.
"""
await super().stop(frame)
await self._stop_client()
async def cancel(self, frame: CancelFrame) -> None:
"""Cancel the Vonage output transport.
Args:
frame: The CancelFrame to cancel the transport.
"""
await super().cancel(frame)
await self._stop_client()
async def _stop_client(self) -> None:
if self._connected:
self._client.remove_listener(self._listener_id)
self._connected = False
try:
await self._client.disconnect()
except Exception:
pass
async def _on_error_cb(self, session: Session, description: str, code: int) -> None:
logger.error(
f"Vonage output transport error session={session.id} code={code} description={description}"
)
if self._connected:
await self.push_error("Vonage video connector error", fatal=True)
class VonageVideoConnectorTransport(BaseTransport):
"""Vonage Video Connector transport implementation for Pipecat.
Provides input and output audio transport for Vonage Video sessions, supporting event handling
for session and participant lifecycle.
Supported features:
- Audio input and output transport for Vonage Video sessions
- Event handler registration for session and participant events
- Publisher and subscriber management
- Configurable audio and migration parameters
"""
_params: VonageVideoConnectorTransportParams
def __init__(
self,
application_id: str,
session_id: str,
token: str,
params: VonageVideoConnectorTransportParams,
):
"""Initialize the Vonage Video Connector transport.
Args:
application_id: The Vonage Video application ID.
session_id: The session ID to connect to.
token: The authentication token for the session.
params: Transport parameters for input/output configuration.
"""
super().__init__()
self._params = params
self._client = VonageClient(application_id, session_id, token, params)
# Register supported handlers.
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
self._register_event_handler("on_error")
self._register_event_handler("on_client_connected", sync=True)
self._register_event_handler("on_client_disconnected")
self._register_event_handler("on_first_participant_joined", sync=True)
self._register_event_handler("on_participant_joined", sync=True)
self._register_event_handler("on_participant_left")
self._client.add_listener(
VonageClientListener(
on_connected=self._on_connected,
on_disconnected=self._on_disconnected,
on_error=self._on_error,
on_stream_received=self._on_stream_received,
on_stream_dropped=self._on_stream_dropped,
on_subscriber_connected=self._on_subscriber_connected,
on_subscriber_disconnected=self._on_subscriber_disconnected,
)
)
self._input: VonageVideoConnectorInputTransport | None = None
self._output: VonageVideoConnectorOutputTransport | None = None
self._one_stream_received: bool = False
def input(self) -> FrameProcessor:
"""Get the input transport for Vonage.
Returns:
The VonageVideoConnectorInputTransport instance.
"""
if not self._input:
self._input = VonageVideoConnectorInputTransport(self._client, self._params)
return self._input
def output(self) -> FrameProcessor:
"""Get the output transport for Vonage.
Returns:
The VonageVideoConnectorOutputTransport instance.
"""
if not self._output:
self._output = VonageVideoConnectorOutputTransport(self._client, self._params)
return self._output
async def subscribe_to_stream(self, stream_id: str, params: SubscribeSettings) -> None:
"""Subscribe to a participant's stream.
Args:
stream_id: The ID of the participant to subscribe to.
params: Subscription parameters for the subscription.
"""
if self._input:
await self._input.subscribe_to_stream(stream_id, params)
async def _on_connected(self, session: Session) -> None:
"""Handle session connected event.
Args:
session: The connected Session object.
"""
await self._call_event_handler("on_joined", {"sessionId": session.id})
async def _on_disconnected(self, session: Session) -> None:
"""Handle session disconnected event.
Args:
session: The disconnected Session object.
"""
await self._call_event_handler("on_left", {"sessionId": session.id})
async def _on_error(self, _session: Session, description: str, _code: int) -> None:
"""Handle session error event.
Args:
_session: The Session object.
description: Error description.
_code: Error code.
"""
await self._call_event_handler("on_error", description)
async def _on_stream_received(self, session: Session, stream: Stream) -> None:
"""Handle stream received event.
Args:
session: The Session object.
stream: The received Stream object.
"""
client = {
"sessionId": session.id,
"streamId": stream.id,
"connectionData": stream.connection.data,
}
if not self._one_stream_received:
self._one_stream_received = True
await self._call_event_handler("on_first_participant_joined", client)
await self._call_event_handler("on_participant_joined", client)
async def _on_stream_dropped(self, session: Session, stream: Stream) -> None:
"""Handle stream dropped event.
Args:
session: The Session object.
stream: The dropped Stream object.
"""
client = {
"sessionId": session.id,
"streamId": stream.id,
"connectionData": stream.connection.data,
}
await self._call_event_handler("on_participant_left", client)
async def _on_subscriber_connected(self, subscriber: Subscriber) -> None:
"""Handle subscriber connected event.
Args:
subscriber: The connected Subscriber object.
"""
await self._call_event_handler(
"on_client_connected",
{
"subscriberId": subscriber.stream.id,
"streamId": subscriber.stream.id,
"connectionData": subscriber.stream.connection.data,
},
)
async def _on_subscriber_disconnected(self, subscriber: Subscriber) -> None:
"""Handle subscriber disconnected event.
Args:
subscriber: The disconnected Subscriber object.
"""
await self._call_event_handler(
"on_client_disconnected",
{
"subscriberId": subscriber.stream.id,
"streamId": subscriber.stream.id,
"connectionData": subscriber.stream.connection.data,
},
)

View File

@@ -48,7 +48,7 @@ except ModuleNotFoundError as e:
logger.error(
"In order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`."
)
raise Exception(f"Missing module: {e}")
raise ImportError(f"Missing module: {e}") from e
class FastAPIWebsocketParams(TransportParams):

Some files were not shown because too many files have changed in this diff Show More