From d3780f19f7161a129fd2244d3f4cd39d44277383 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 18 Dec 2025 22:33:08 -0500 Subject: [PATCH] Add Vonage support to telephony runner utilities --- changelog/3265.changed.2.md | 1 + src/pipecat/runner/utils.py | 80 +++++++++++++++++++++++++++++++------ 2 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 changelog/3265.changed.2.md diff --git a/changelog/3265.changed.2.md b/changelog/3265.changed.2.md new file mode 100644 index 000000000..51366248c --- /dev/null +++ b/changelog/3265.changed.2.md @@ -0,0 +1 @@ +- Updated the development runner utility to support parsing `VonageFrameSerializer` websocket messages and handling/passing the to and from numbers via the `RunnerArguments`. diff --git a/src/pipecat/runner/utils.py b/src/pipecat/runner/utils.py index 76a6fa82f..eda4f6cf0 100644 --- a/src/pipecat/runner/utils.py +++ b/src/pipecat/runner/utils.py @@ -88,6 +88,11 @@ def _detect_transport_type_from_message(message_data: dict) -> str: logger.trace("Auto-detected: EXOTEL") return "exotel" + # Vonage detection + if message_data.get("event") == "websocket:connected" and "content-type" in message_data: + logger.trace("Auto-detected: VONAGE") + return "vonage" + logger.trace("Auto-detection failed - unknown format") return "unknown" @@ -135,6 +140,16 @@ async def parse_telephony_websocket(websocket: WebSocket): "to": str, } + - Vonage:: + + { + "content_type": str, # e.g., "audio/l16;rate=16000" + "from": str, + "to": str, + "call_uuid": str, + "conversation_uuid": str, + } + Example usage:: transport_type, call_data = await parse_telephony_websocket(websocket) @@ -153,17 +168,23 @@ async def parse_telephony_websocket(websocket: WebSocket): except json.JSONDecodeError: first_message = {} - # Second message - second_message_raw = await start_data.__anext__() - logger.trace(f"Second message: {second_message_raw}") - try: - second_message = json.loads(second_message_raw) - except json.JSONDecodeError: - second_message = {} - - # Try auto-detection on both messages + # Try auto-detection on first message detected_type_first = _detect_transport_type_from_message(first_message) - detected_type_second = _detect_transport_type_from_message(second_message) + + # Vonage only sends one text message at start, then binary audio + # For other providers, read second message + if detected_type_first != "vonage": + second_message_raw = await start_data.__anext__() + logger.debug(f"Second message: {second_message_raw}") + try: + second_message = json.loads(second_message_raw) + except json.JSONDecodeError: + second_message = {} + + detected_type_second = _detect_transport_type_from_message(second_message) + else: + second_message = {} + detected_type_second = "unknown" # Use the successful detection if detected_type_first != "unknown": @@ -219,6 +240,21 @@ async def parse_telephony_websocket(websocket: WebSocket): "custom_parameters": start_data.get("custom_parameters", ""), } + elif transport_type == "vonage": + # Vonage sends websocket:connected event with content-type + content_type = call_data_raw.get("content-type", "audio/l16;rate=16000") + call_data = { + "content_type": content_type, + } + + # Extract phone numbers and call info from websocket state (set by server.py) + if hasattr(websocket, "state") and hasattr(websocket.state, "vonage_call_data"): + vonage_data = websocket.state.vonage_call_data + call_data["from"] = vonage_data.get("from", "") + call_data["to"] = vonage_data.get("to", "") + call_data["call_uuid"] = vonage_data.get("call_uuid", "") + call_data["conversation_uuid"] = vonage_data.get("conversation_uuid", "") + else: call_data = {} @@ -465,10 +501,24 @@ async def _create_telephony_transport( stream_sid=call_data["stream_id"], call_sid=call_data["call_id"], ) + elif transport_type == "vonage": + from pipecat.serializers.vonage import VonageFrameSerializer + + content_type = call_data.get("content_type", "audio/l16;rate=16000") + sample_rate = 16000 # Default + if "rate=" in content_type: + try: + sample_rate = int(content_type.split("rate=")[1].split(";")[0]) + except (ValueError, IndexError): + logger.warning(f"Could not parse sample rate from {content_type}, using 16000") + + params.serializer = VonageFrameSerializer( + params=VonageFrameSerializer.InputParams(vonage_sample_rate=sample_rate) + ) else: raise ValueError( f"Unsupported telephony provider: {transport_type}. " - f"Supported providers: twilio, telnyx, plivo, exotel" + f"Supported providers: twilio, telnyx, plivo, exotel, vonage" ) return FastAPIWebsocketTransport(websocket=websocket, params=params) @@ -485,7 +535,7 @@ async def create_transport( Args: runner_args: Arguments from the runner. transport_params: Dict mapping transport names to parameter factory functions. - Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel" + Keys should be: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel", "vonage" Values should be functions that return transport parameters when called. Returns: @@ -532,6 +582,12 @@ async def create_transport( vad_analyzer=SileroVADAnalyzer(), # add_wav_header and serializer will be set automatically ), + "vonage": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + # add_wav_header and serializer will be set automatically + ), } transport = await create_transport(runner_args, transport_params)