Compare commits

...

4 Commits

Author SHA1 Message Date
Mark Backman
d3780f19f7 Add Vonage support to telephony runner utilities 2025-12-18 23:10:09 -05:00
Mark Backman
6c75d6f24a Add VonageFrameSerializer for Vonage Voice API 2025-12-18 23:10:09 -05:00
Mark Backman
024625fc15 Add MIXED mode support to FastAPIWebsocketTransport 2025-12-18 22:44:05 -05:00
Mark Backman
186f76db46 Add MIXED type to FrameSerializerType enum 2025-12-18 22:32:30 -05:00
7 changed files with 308 additions and 16 deletions

1
changelog/3265.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `VonageFrameSerializer` in support of Vonage's Voice API over websockets.

View File

@@ -0,0 +1 @@
- Updated the development runner utility to support parsing `VonageFrameSerializer` websocket messages and handling/passing the to and from numbers via the `RunnerArguments`.

View File

@@ -0,0 +1 @@
- Updated FastAPIWebsocketTransport to support `MIXED` mode, handling binary and text messages over the same websocket connection.

View File

@@ -88,6 +88,11 @@ def _detect_transport_type_from_message(message_data: dict) -> str:
logger.trace("Auto-detected: EXOTEL")
return "exotel"
# Vonage detection
if message_data.get("event") == "websocket:connected" and "content-type" in message_data:
logger.trace("Auto-detected: VONAGE")
return "vonage"
logger.trace("Auto-detection failed - unknown format")
return "unknown"
@@ -135,6 +140,16 @@ async def parse_telephony_websocket(websocket: WebSocket):
"to": str,
}
- Vonage::
{
"content_type": str, # e.g., "audio/l16;rate=16000"
"from": str,
"to": str,
"call_uuid": str,
"conversation_uuid": str,
}
Example usage::
transport_type, call_data = await parse_telephony_websocket(websocket)
@@ -153,17 +168,23 @@ async def parse_telephony_websocket(websocket: WebSocket):
except json.JSONDecodeError:
first_message = {}
# Second message
second_message_raw = await start_data.__anext__()
logger.trace(f"Second message: {second_message_raw}")
try:
second_message = json.loads(second_message_raw)
except json.JSONDecodeError:
second_message = {}
# Try auto-detection on both messages
# Try auto-detection on first message
detected_type_first = _detect_transport_type_from_message(first_message)
detected_type_second = _detect_transport_type_from_message(second_message)
# Vonage only sends one text message at start, then binary audio
# For other providers, read second message
if detected_type_first != "vonage":
second_message_raw = await start_data.__anext__()
logger.debug(f"Second message: {second_message_raw}")
try:
second_message = json.loads(second_message_raw)
except json.JSONDecodeError:
second_message = {}
detected_type_second = _detect_transport_type_from_message(second_message)
else:
second_message = {}
detected_type_second = "unknown"
# Use the successful detection
if detected_type_first != "unknown":
@@ -219,6 +240,21 @@ async def parse_telephony_websocket(websocket: WebSocket):
"custom_parameters": start_data.get("custom_parameters", ""),
}
elif transport_type == "vonage":
# Vonage sends websocket:connected event with content-type
content_type = call_data_raw.get("content-type", "audio/l16;rate=16000")
call_data = {
"content_type": content_type,
}
# Extract phone numbers and call info from websocket state (set by server.py)
if hasattr(websocket, "state") and hasattr(websocket.state, "vonage_call_data"):
vonage_data = websocket.state.vonage_call_data
call_data["from"] = vonage_data.get("from", "")
call_data["to"] = vonage_data.get("to", "")
call_data["call_uuid"] = vonage_data.get("call_uuid", "")
call_data["conversation_uuid"] = vonage_data.get("conversation_uuid", "")
else:
call_data = {}
@@ -465,10 +501,24 @@ async def _create_telephony_transport(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
)
elif transport_type == "vonage":
from pipecat.serializers.vonage import VonageFrameSerializer
content_type = call_data.get("content_type", "audio/l16;rate=16000")
sample_rate = 16000 # Default
if "rate=" in content_type:
try:
sample_rate = int(content_type.split("rate=")[1].split(";")[0])
except (ValueError, IndexError):
logger.warning(f"Could not parse sample rate from {content_type}, using 16000")
params.serializer = VonageFrameSerializer(
params=VonageFrameSerializer.InputParams(vonage_sample_rate=sample_rate)
)
else:
raise ValueError(
f"Unsupported telephony provider: {transport_type}. "
f"Supported providers: twilio, telnyx, plivo, exotel"
f"Supported providers: twilio, telnyx, plivo, exotel, vonage"
)
return FastAPIWebsocketTransport(websocket=websocket, params=params)
@@ -485,7 +535,7 @@ async def create_transport(
Args:
runner_args: Arguments from the runner.
transport_params: Dict mapping transport names to parameter factory functions.
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel"
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel", "vonage"
Values should be functions that return transport parameters when called.
Returns:
@@ -532,6 +582,12 @@ async def create_transport(
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"vonage": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}
transport = await create_transport(runner_args, transport_params)

View File

@@ -18,10 +18,12 @@ class FrameSerializerType(Enum):
Parameters:
BINARY: Binary serialization format for compact representation.
TEXT: Text-based serialization format for human-readable output.
MIXED: Mixed format supporting both binary and text messages (e.g., Vonage).
"""
BINARY = "binary"
TEXT = "text"
MIXED = "mixed"
class FrameSerializer(ABC):

View File

@@ -0,0 +1,194 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Vonage Voice API WebSocket serializer for Pipecat."""
import json
from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
class VonageFrameSerializer(FrameSerializer):
"""Serializer for Vonage Voice API WebSocket protocol.
This serializer handles converting between Pipecat frames and Vonage's WebSocket
voice streaming protocol. It supports audio conversion (16-bit linear PCM),
control commands like clear and notify, and DTMF input handling.
The Vonage Voice API uses:
- Binary messages for audio data (16-bit linear PCM)
- Text messages (JSON) for control commands, events, and DTMF
Note: Ref docs for WebSocket API:
https://developer.vonage.com/en/voice/voice-api/guides/websockets
"""
class InputParams(BaseModel):
"""Configuration parameters for VonageFrameSerializer.
Parameters:
vonage_sample_rate: Sample rate used by Vonage, defaults to 16000 Hz.
Common values: 8000, 16000, 24000 Hz.
sample_rate: Optional override for pipeline input sample rate.
"""
vonage_sample_rate: int = 16000
sample_rate: Optional[int] = None
def __init__(self, params: Optional[InputParams] = None):
"""Initialize the VonageFrameSerializer.
Args:
params: Configuration parameters.
"""
self._params = params or VonageFrameSerializer.InputParams()
self._vonage_sample_rate = self._params.vonage_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
@property
def type(self) -> FrameSerializerType:
"""Gets the serializer type.
Returns:
The serializer type (MIXED for Vonage - supports both binary and text).
"""
return FrameSerializerType.MIXED
async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.
Args:
frame: The StartFrame containing pipeline configuration.
"""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
"""Serializes a Pipecat frame to Vonage WebSocket format.
Handles conversion of various frame types to Vonage WebSocket messages.
Args:
frame: The Pipecat frame to serialize.
Returns:
Serialized data as string (JSON commands) or bytes (audio), or None if the frame isn't handled.
"""
if isinstance(frame, InterruptionFrame):
# Clear the audio buffer to stop playback immediately
answer = {"action": "clear"}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
data = frame.audio
# Output: Convert PCM at frame's rate to Vonage's sample rate (16-bit linear PCM)
serialized_data = await self._output_resampler.resample(
data, frame.sample_rate, self._vonage_sample_rate
)
if serialized_data is None or len(serialized_data) == 0:
# Ignoring in case we don't have audio
return None
# Vonage expects raw binary PCM data (not base64 encoded)
return serialized_data
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
# Allow sending custom JSON commands (e.g., notify)
return json.dumps(frame.message)
return None
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Vonage WebSocket data to Pipecat frames.
Handles conversion of Vonage events to appropriate Pipecat frames.
- Binary messages contain audio data (16-bit linear PCM)
- Text messages contain JSON events (websocket:connected, websocket:cleared, dtmf, etc.)
Args:
data: The raw WebSocket data from Vonage.
Returns:
A Pipecat frame corresponding to the Vonage event, or None if unhandled.
"""
# Check if this is binary audio data
if isinstance(data, bytes):
# Binary message = audio data (16-bit linear PCM)
payload = data
# Input: Convert Vonage's PCM audio to pipeline sample rate
deserialized_data = await self._input_resampler.resample(
payload,
self._vonage_sample_rate,
self._sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None
audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=1, # Vonage uses mono audio
sample_rate=self._sample_rate, # Use the configured pipeline input rate
)
return audio_frame
else:
# Text message = JSON event
try:
message = json.loads(data)
event = message.get("event")
# Handle different event types
if event == "websocket:connected":
logger.debug(
f"Vonage WebSocket connected: content-type={message.get('content-type')}"
)
return None
elif event == "websocket:cleared":
logger.debug("Vonage audio buffer cleared")
return None
elif event == "websocket:notify":
logger.debug(f"Vonage notify event: {message.get('payload')}")
return None
elif event == "websocket:dtmf":
# Handle DTMF input
# Vonage may send digit in different formats, try both
digit = message.get("digit") or message.get("dtmf", {}).get("digit")
if digit:
logger.debug(f"Received DTMF digit: {digit}")
try:
return InputDTMFFrame(KeypadEntry(digit))
except ValueError:
logger.warning(f"Invalid DTMF digit received: {digit}")
return None
else:
logger.warning(f"DTMF event received but no digit found: {message}")
return None
else:
logger.debug(f"Vonage event: {event}")
return None
except json.JSONDecodeError:
logger.warning(f"Failed to parse JSON message from Vonage: {data}")
return None

View File

@@ -84,17 +84,25 @@ class FastAPIWebsocketClient:
with support for both binary and text message types.
"""
def __init__(self, websocket: WebSocket, is_binary: bool, callbacks: FastAPIWebsocketCallbacks):
def __init__(
self,
websocket: WebSocket,
is_binary: bool,
callbacks: FastAPIWebsocketCallbacks,
mixed_mode: bool = False,
):
"""Initialize the WebSocket client.
Args:
websocket: The FastAPI WebSocket connection.
is_binary: Whether to use binary message format.
callbacks: Event callback functions.
mixed_mode: Whether to support both binary and text messages (e.g., for Vonage).
"""
self._websocket = websocket
self._closing = False
self._is_binary = is_binary
self._mixed_mode = mixed_mode
self._callbacks = callbacks
self._leave_counter = 0
@@ -112,8 +120,28 @@ class FastAPIWebsocketClient:
Returns:
An async iterator yielding bytes or strings based on message type.
"""
if self._mixed_mode:
# For mixed mode (e.g., Vonage), we need to handle both binary and text messages
# We'll use iter_json() which can handle both types
return self._receive_mixed()
return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text()
async def _receive_mixed(self):
"""Receive both binary and text messages for mixed mode protocols."""
while True:
try:
# Receive raw message to determine type
message = await self._websocket.receive()
if "bytes" in message:
yield message["bytes"]
elif "text" in message:
yield message["text"]
else:
# Connection closed or other event
break
except Exception:
break
async def send(self, data: str | bytes):
"""Send data through the WebSocket connection.
@@ -122,7 +150,13 @@ class FastAPIWebsocketClient:
"""
try:
if self._can_send():
if self._is_binary:
# In mixed mode, determine type from the data itself
if self._mixed_mode:
if isinstance(data, bytes):
await self._websocket.send_bytes(data)
else:
await self._websocket.send_text(data)
elif self._is_binary:
await self._websocket.send_bytes(data)
else:
await self._websocket.send_text(data)
@@ -511,9 +545,12 @@ class FastAPIWebsocketTransport(BaseTransport):
)
is_binary = False
mixed_mode = False
if self._params.serializer:
is_binary = self._params.serializer.type == FrameSerializerType.BINARY
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks)
serializer_type = self._params.serializer.type
is_binary = serializer_type == FrameSerializerType.BINARY
mixed_mode = serializer_type == FrameSerializerType.MIXED
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks, mixed_mode)
self._input = FastAPIWebsocketInputTransport(
self, self._client, self._params, name=self._input_name