Compare commits
4 Commits
kompfner-p
...
mb/vonage-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3780f19f7 | ||
|
|
6c75d6f24a | ||
|
|
024625fc15 | ||
|
|
186f76db46 |
1
changelog/3265.added.md
Normal file
1
changelog/3265.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `VonageFrameSerializer` in support of Vonage's Voice API over websockets.
|
||||
1
changelog/3265.changed.2.md
Normal file
1
changelog/3265.changed.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Updated the development runner utility to support parsing `VonageFrameSerializer` websocket messages and handling/passing the to and from numbers via the `RunnerArguments`.
|
||||
1
changelog/3265.changed.md
Normal file
1
changelog/3265.changed.md
Normal file
@@ -0,0 +1 @@
|
||||
- Updated FastAPIWebsocketTransport to support `MIXED` mode, handling binary and text messages over the same websocket connection.
|
||||
@@ -88,6 +88,11 @@ def _detect_transport_type_from_message(message_data: dict) -> str:
|
||||
logger.trace("Auto-detected: EXOTEL")
|
||||
return "exotel"
|
||||
|
||||
# Vonage detection
|
||||
if message_data.get("event") == "websocket:connected" and "content-type" in message_data:
|
||||
logger.trace("Auto-detected: VONAGE")
|
||||
return "vonage"
|
||||
|
||||
logger.trace("Auto-detection failed - unknown format")
|
||||
return "unknown"
|
||||
|
||||
@@ -135,6 +140,16 @@ async def parse_telephony_websocket(websocket: WebSocket):
|
||||
"to": str,
|
||||
}
|
||||
|
||||
- Vonage::
|
||||
|
||||
{
|
||||
"content_type": str, # e.g., "audio/l16;rate=16000"
|
||||
"from": str,
|
||||
"to": str,
|
||||
"call_uuid": str,
|
||||
"conversation_uuid": str,
|
||||
}
|
||||
|
||||
Example usage::
|
||||
|
||||
transport_type, call_data = await parse_telephony_websocket(websocket)
|
||||
@@ -153,17 +168,23 @@ async def parse_telephony_websocket(websocket: WebSocket):
|
||||
except json.JSONDecodeError:
|
||||
first_message = {}
|
||||
|
||||
# Second message
|
||||
second_message_raw = await start_data.__anext__()
|
||||
logger.trace(f"Second message: {second_message_raw}")
|
||||
try:
|
||||
second_message = json.loads(second_message_raw)
|
||||
except json.JSONDecodeError:
|
||||
second_message = {}
|
||||
|
||||
# Try auto-detection on both messages
|
||||
# Try auto-detection on first message
|
||||
detected_type_first = _detect_transport_type_from_message(first_message)
|
||||
detected_type_second = _detect_transport_type_from_message(second_message)
|
||||
|
||||
# Vonage only sends one text message at start, then binary audio
|
||||
# For other providers, read second message
|
||||
if detected_type_first != "vonage":
|
||||
second_message_raw = await start_data.__anext__()
|
||||
logger.debug(f"Second message: {second_message_raw}")
|
||||
try:
|
||||
second_message = json.loads(second_message_raw)
|
||||
except json.JSONDecodeError:
|
||||
second_message = {}
|
||||
|
||||
detected_type_second = _detect_transport_type_from_message(second_message)
|
||||
else:
|
||||
second_message = {}
|
||||
detected_type_second = "unknown"
|
||||
|
||||
# Use the successful detection
|
||||
if detected_type_first != "unknown":
|
||||
@@ -219,6 +240,21 @@ async def parse_telephony_websocket(websocket: WebSocket):
|
||||
"custom_parameters": start_data.get("custom_parameters", ""),
|
||||
}
|
||||
|
||||
elif transport_type == "vonage":
|
||||
# Vonage sends websocket:connected event with content-type
|
||||
content_type = call_data_raw.get("content-type", "audio/l16;rate=16000")
|
||||
call_data = {
|
||||
"content_type": content_type,
|
||||
}
|
||||
|
||||
# Extract phone numbers and call info from websocket state (set by server.py)
|
||||
if hasattr(websocket, "state") and hasattr(websocket.state, "vonage_call_data"):
|
||||
vonage_data = websocket.state.vonage_call_data
|
||||
call_data["from"] = vonage_data.get("from", "")
|
||||
call_data["to"] = vonage_data.get("to", "")
|
||||
call_data["call_uuid"] = vonage_data.get("call_uuid", "")
|
||||
call_data["conversation_uuid"] = vonage_data.get("conversation_uuid", "")
|
||||
|
||||
else:
|
||||
call_data = {}
|
||||
|
||||
@@ -465,10 +501,24 @@ async def _create_telephony_transport(
|
||||
stream_sid=call_data["stream_id"],
|
||||
call_sid=call_data["call_id"],
|
||||
)
|
||||
elif transport_type == "vonage":
|
||||
from pipecat.serializers.vonage import VonageFrameSerializer
|
||||
|
||||
content_type = call_data.get("content_type", "audio/l16;rate=16000")
|
||||
sample_rate = 16000 # Default
|
||||
if "rate=" in content_type:
|
||||
try:
|
||||
sample_rate = int(content_type.split("rate=")[1].split(";")[0])
|
||||
except (ValueError, IndexError):
|
||||
logger.warning(f"Could not parse sample rate from {content_type}, using 16000")
|
||||
|
||||
params.serializer = VonageFrameSerializer(
|
||||
params=VonageFrameSerializer.InputParams(vonage_sample_rate=sample_rate)
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unsupported telephony provider: {transport_type}. "
|
||||
f"Supported providers: twilio, telnyx, plivo, exotel"
|
||||
f"Supported providers: twilio, telnyx, plivo, exotel, vonage"
|
||||
)
|
||||
|
||||
return FastAPIWebsocketTransport(websocket=websocket, params=params)
|
||||
@@ -485,7 +535,7 @@ async def create_transport(
|
||||
Args:
|
||||
runner_args: Arguments from the runner.
|
||||
transport_params: Dict mapping transport names to parameter factory functions.
|
||||
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel"
|
||||
Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel", "vonage"
|
||||
Values should be functions that return transport parameters when called.
|
||||
|
||||
Returns:
|
||||
@@ -532,6 +582,12 @@ async def create_transport(
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
"vonage": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
# add_wav_header and serializer will be set automatically
|
||||
),
|
||||
}
|
||||
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
|
||||
@@ -18,10 +18,12 @@ class FrameSerializerType(Enum):
|
||||
Parameters:
|
||||
BINARY: Binary serialization format for compact representation.
|
||||
TEXT: Text-based serialization format for human-readable output.
|
||||
MIXED: Mixed format supporting both binary and text messages (e.g., Vonage).
|
||||
"""
|
||||
|
||||
BINARY = "binary"
|
||||
TEXT = "text"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
class FrameSerializer(ABC):
|
||||
|
||||
194
src/pipecat/serializers/vonage.py
Normal file
194
src/pipecat/serializers/vonage.py
Normal file
@@ -0,0 +1,194 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Vonage Voice API WebSocket serializer for Pipecat."""
|
||||
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.dtmf.types import KeypadEntry
|
||||
from pipecat.audio.utils import create_stream_resampler
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
InterruptionFrame,
|
||||
OutputTransportMessageFrame,
|
||||
OutputTransportMessageUrgentFrame,
|
||||
StartFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
|
||||
|
||||
class VonageFrameSerializer(FrameSerializer):
|
||||
"""Serializer for Vonage Voice API WebSocket protocol.
|
||||
|
||||
This serializer handles converting between Pipecat frames and Vonage's WebSocket
|
||||
voice streaming protocol. It supports audio conversion (16-bit linear PCM),
|
||||
control commands like clear and notify, and DTMF input handling.
|
||||
|
||||
The Vonage Voice API uses:
|
||||
- Binary messages for audio data (16-bit linear PCM)
|
||||
- Text messages (JSON) for control commands, events, and DTMF
|
||||
|
||||
Note: Ref docs for WebSocket API:
|
||||
https://developer.vonage.com/en/voice/voice-api/guides/websockets
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
"""Configuration parameters for VonageFrameSerializer.
|
||||
|
||||
Parameters:
|
||||
vonage_sample_rate: Sample rate used by Vonage, defaults to 16000 Hz.
|
||||
Common values: 8000, 16000, 24000 Hz.
|
||||
sample_rate: Optional override for pipeline input sample rate.
|
||||
"""
|
||||
|
||||
vonage_sample_rate: int = 16000
|
||||
sample_rate: Optional[int] = None
|
||||
|
||||
def __init__(self, params: Optional[InputParams] = None):
|
||||
"""Initialize the VonageFrameSerializer.
|
||||
|
||||
Args:
|
||||
params: Configuration parameters.
|
||||
"""
|
||||
self._params = params or VonageFrameSerializer.InputParams()
|
||||
|
||||
self._vonage_sample_rate = self._params.vonage_sample_rate
|
||||
self._sample_rate = 0 # Pipeline input rate
|
||||
|
||||
self._input_resampler = create_stream_resampler()
|
||||
self._output_resampler = create_stream_resampler()
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
"""Gets the serializer type.
|
||||
|
||||
Returns:
|
||||
The serializer type (MIXED for Vonage - supports both binary and text).
|
||||
"""
|
||||
return FrameSerializerType.MIXED
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
"""Sets up the serializer with pipeline configuration.
|
||||
|
||||
Args:
|
||||
frame: The StartFrame containing pipeline configuration.
|
||||
"""
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
"""Serializes a Pipecat frame to Vonage WebSocket format.
|
||||
|
||||
Handles conversion of various frame types to Vonage WebSocket messages.
|
||||
|
||||
Args:
|
||||
frame: The Pipecat frame to serialize.
|
||||
|
||||
Returns:
|
||||
Serialized data as string (JSON commands) or bytes (audio), or None if the frame isn't handled.
|
||||
"""
|
||||
if isinstance(frame, InterruptionFrame):
|
||||
# Clear the audio buffer to stop playback immediately
|
||||
answer = {"action": "clear"}
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
# Output: Convert PCM at frame's rate to Vonage's sample rate (16-bit linear PCM)
|
||||
serialized_data = await self._output_resampler.resample(
|
||||
data, frame.sample_rate, self._vonage_sample_rate
|
||||
)
|
||||
if serialized_data is None or len(serialized_data) == 0:
|
||||
# Ignoring in case we don't have audio
|
||||
return None
|
||||
|
||||
# Vonage expects raw binary PCM data (not base64 encoded)
|
||||
return serialized_data
|
||||
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
|
||||
# Allow sending custom JSON commands (e.g., notify)
|
||||
return json.dumps(frame.message)
|
||||
|
||||
return None
|
||||
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Deserializes Vonage WebSocket data to Pipecat frames.
|
||||
|
||||
Handles conversion of Vonage events to appropriate Pipecat frames.
|
||||
- Binary messages contain audio data (16-bit linear PCM)
|
||||
- Text messages contain JSON events (websocket:connected, websocket:cleared, dtmf, etc.)
|
||||
|
||||
Args:
|
||||
data: The raw WebSocket data from Vonage.
|
||||
|
||||
Returns:
|
||||
A Pipecat frame corresponding to the Vonage event, or None if unhandled.
|
||||
"""
|
||||
# Check if this is binary audio data
|
||||
if isinstance(data, bytes):
|
||||
# Binary message = audio data (16-bit linear PCM)
|
||||
payload = data
|
||||
|
||||
# Input: Convert Vonage's PCM audio to pipeline sample rate
|
||||
deserialized_data = await self._input_resampler.resample(
|
||||
payload,
|
||||
self._vonage_sample_rate,
|
||||
self._sample_rate,
|
||||
)
|
||||
if deserialized_data is None or len(deserialized_data) == 0:
|
||||
# Ignoring in case we don't have audio
|
||||
return None
|
||||
|
||||
audio_frame = InputAudioRawFrame(
|
||||
audio=deserialized_data,
|
||||
num_channels=1, # Vonage uses mono audio
|
||||
sample_rate=self._sample_rate, # Use the configured pipeline input rate
|
||||
)
|
||||
return audio_frame
|
||||
else:
|
||||
# Text message = JSON event
|
||||
try:
|
||||
message = json.loads(data)
|
||||
event = message.get("event")
|
||||
|
||||
# Handle different event types
|
||||
if event == "websocket:connected":
|
||||
logger.debug(
|
||||
f"Vonage WebSocket connected: content-type={message.get('content-type')}"
|
||||
)
|
||||
return None
|
||||
elif event == "websocket:cleared":
|
||||
logger.debug("Vonage audio buffer cleared")
|
||||
return None
|
||||
elif event == "websocket:notify":
|
||||
logger.debug(f"Vonage notify event: {message.get('payload')}")
|
||||
return None
|
||||
elif event == "websocket:dtmf":
|
||||
# Handle DTMF input
|
||||
# Vonage may send digit in different formats, try both
|
||||
digit = message.get("digit") or message.get("dtmf", {}).get("digit")
|
||||
if digit:
|
||||
logger.debug(f"Received DTMF digit: {digit}")
|
||||
try:
|
||||
return InputDTMFFrame(KeypadEntry(digit))
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid DTMF digit received: {digit}")
|
||||
return None
|
||||
else:
|
||||
logger.warning(f"DTMF event received but no digit found: {message}")
|
||||
return None
|
||||
else:
|
||||
logger.debug(f"Vonage event: {event}")
|
||||
return None
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Failed to parse JSON message from Vonage: {data}")
|
||||
return None
|
||||
@@ -84,17 +84,25 @@ class FastAPIWebsocketClient:
|
||||
with support for both binary and text message types.
|
||||
"""
|
||||
|
||||
def __init__(self, websocket: WebSocket, is_binary: bool, callbacks: FastAPIWebsocketCallbacks):
|
||||
def __init__(
|
||||
self,
|
||||
websocket: WebSocket,
|
||||
is_binary: bool,
|
||||
callbacks: FastAPIWebsocketCallbacks,
|
||||
mixed_mode: bool = False,
|
||||
):
|
||||
"""Initialize the WebSocket client.
|
||||
|
||||
Args:
|
||||
websocket: The FastAPI WebSocket connection.
|
||||
is_binary: Whether to use binary message format.
|
||||
callbacks: Event callback functions.
|
||||
mixed_mode: Whether to support both binary and text messages (e.g., for Vonage).
|
||||
"""
|
||||
self._websocket = websocket
|
||||
self._closing = False
|
||||
self._is_binary = is_binary
|
||||
self._mixed_mode = mixed_mode
|
||||
self._callbacks = callbacks
|
||||
self._leave_counter = 0
|
||||
|
||||
@@ -112,8 +120,28 @@ class FastAPIWebsocketClient:
|
||||
Returns:
|
||||
An async iterator yielding bytes or strings based on message type.
|
||||
"""
|
||||
if self._mixed_mode:
|
||||
# For mixed mode (e.g., Vonage), we need to handle both binary and text messages
|
||||
# We'll use iter_json() which can handle both types
|
||||
return self._receive_mixed()
|
||||
return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text()
|
||||
|
||||
async def _receive_mixed(self):
|
||||
"""Receive both binary and text messages for mixed mode protocols."""
|
||||
while True:
|
||||
try:
|
||||
# Receive raw message to determine type
|
||||
message = await self._websocket.receive()
|
||||
if "bytes" in message:
|
||||
yield message["bytes"]
|
||||
elif "text" in message:
|
||||
yield message["text"]
|
||||
else:
|
||||
# Connection closed or other event
|
||||
break
|
||||
except Exception:
|
||||
break
|
||||
|
||||
async def send(self, data: str | bytes):
|
||||
"""Send data through the WebSocket connection.
|
||||
|
||||
@@ -122,7 +150,13 @@ class FastAPIWebsocketClient:
|
||||
"""
|
||||
try:
|
||||
if self._can_send():
|
||||
if self._is_binary:
|
||||
# In mixed mode, determine type from the data itself
|
||||
if self._mixed_mode:
|
||||
if isinstance(data, bytes):
|
||||
await self._websocket.send_bytes(data)
|
||||
else:
|
||||
await self._websocket.send_text(data)
|
||||
elif self._is_binary:
|
||||
await self._websocket.send_bytes(data)
|
||||
else:
|
||||
await self._websocket.send_text(data)
|
||||
@@ -511,9 +545,12 @@ class FastAPIWebsocketTransport(BaseTransport):
|
||||
)
|
||||
|
||||
is_binary = False
|
||||
mixed_mode = False
|
||||
if self._params.serializer:
|
||||
is_binary = self._params.serializer.type == FrameSerializerType.BINARY
|
||||
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks)
|
||||
serializer_type = self._params.serializer.type
|
||||
is_binary = serializer_type == FrameSerializerType.BINARY
|
||||
mixed_mode = serializer_type == FrameSerializerType.MIXED
|
||||
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks, mixed_mode)
|
||||
|
||||
self._input = FastAPIWebsocketInputTransport(
|
||||
self, self._client, self._params, name=self._input_name
|
||||
|
||||
Reference in New Issue
Block a user