diff --git a/CHANGELOG.md b/CHANGELOG.md index cb7e78aaf..a297b8d3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Added automatic hangup logic to the Telnyx serializer. This feature hangs up + the Telnyx call when an `EndFrame` or `CancelFrame` is received. It is + enabled by default and is configurable via the `auto_hang_up` `InputParam`. + +### Changed + +- In `TwilioFrameSerializer`, `call_sid` is Optional so as to avoid a breaking + changed. `call_sid` is required to automatically hang up. + +### Fixed + +- Fixed an issue where `TwilioFrameSerializer` would send two hang up commands: + one for the `EndFrame` and one for the `CancelFrame`. + ## [0.0.64] - 2025-04-22 ### Added diff --git a/examples/telnyx-chatbot/README.md b/examples/telnyx-chatbot/README.md index e7400940a..9c934cbf0 100644 --- a/examples/telnyx-chatbot/README.md +++ b/examples/telnyx-chatbot/README.md @@ -63,20 +63,35 @@ This project is a FastAPI-based chatbot that integrates with Telnyx to handle We ngrok http 8765 ``` -2. **Update the Telnyx TeXML applications Webhook**: +2. **Purchase a number** - - Go to your TeXML configuration page - - Provide the ngrok URL to the Webhook URL field and ensure the POST method is selected - - Click Save at the bottom of the page + If you haven't already, purchase a number from Telnyx. -3. **Configure streams.xml**: + - Log in to the Telnyx developer portal: https://portal.telnyx.com/ + - Buy a number: https://portal.telnyx.com/#/numbers/buy-numbers + +3. **Update the Telnyx TeXML applications Webhook**: + + - Go to your TeXML configuration page: https://portal.telnyx.com/#/call-control/texml + - Create a new TeXML app, if one doesn't exist already: + - Add an application name + - Under Webhooks, select POST as the "Voice Method" + - Select "Custom URL" under Webhook URL Method + - Enter your ngrok URL in the "Webhook URL" field (e.g. https://your-name.ngrok.io) + - Click "Create" to save + Note: You'll see subsequent pages to set up SIP and Outbound, both are not required, so just skip. + - Navigate to "Manage Numbers" (https://portal.telnyx.com/#/numbers/my-numbers) and under SIP connection, select the pencil icon to edit and select the TeXML application that you just created. + + Now your number is ready to call. + +4. **Configure streams.xml**: - Copy the template file to create your local version: ```sh cp templates/streams.xml.template templates/streams.xml ``` - In `templates/streams.xml`, replace `` with your ngrok URL (without `https://`) - - The final URL should look like: `wss://abc123.ngrok.io/ws` - - The encoding (`bidirectionalCodec`) should be `PCMU` or `PCMA` depending on your needs. Based on selected encoding, set the outbound_encoding in `server.py` when the bot is initialized. + - The final URL should look like: `wss://abc123.ngrok.io/ws`. This needs to be the same URL that you added to your TeXML app above. + - The encoding (`bidirectionalCodec`) should be `PCMU` or `PCMA` depending on your needs. Based on selected encoding, set the outbound_encoding in `server.py` when the bot is initialized. (No changes are required by default.) - The inbound encoding can be controlled from the application configuration for inbound calls and dial/transfer commands for outbound calls. ## Running the Application diff --git a/examples/telnyx-chatbot/bot.py b/examples/telnyx-chatbot/bot.py index 861aa1c7a..ff743f26c 100644 --- a/examples/telnyx-chatbot/bot.py +++ b/examples/telnyx-chatbot/bot.py @@ -33,9 +33,18 @@ logger.add(sys.stderr, level="DEBUG") async def run_bot( websocket_client, stream_id: str, + call_control_id: str, outbound_encoding: str, inbound_encoding: str, ): + serializer = TelnyxFrameSerializer( + stream_id=stream_id, + outbound_encoding=outbound_encoding, + inbound_encoding=inbound_encoding, + call_control_id=call_control_id, + api_key=os.getenv("TELNYX_API_KEY"), + ) + transport = FastAPIWebsocketTransport( websocket=websocket_client, params=FastAPIWebsocketParams( @@ -44,7 +53,7 @@ async def run_bot( vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), vad_audio_passthrough=True, - serializer=TelnyxFrameSerializer(stream_id, outbound_encoding, inbound_encoding), + serializer=serializer, ), ) diff --git a/examples/telnyx-chatbot/server.py b/examples/telnyx-chatbot/server.py index 7bc1c0615..23cb9f065 100644 --- a/examples/telnyx-chatbot/server.py +++ b/examples/telnyx-chatbot/server.py @@ -37,9 +37,10 @@ async def websocket_endpoint(websocket: WebSocket): call_data = json.loads(await start_data.__anext__()) print(call_data, flush=True) stream_id = call_data["stream_id"] + call_control_id = call_data["start"]["call_control_id"] outbound_encoding = call_data["start"]["media_format"]["encoding"] print("WebSocket connection accepted") - await run_bot(websocket, stream_id, outbound_encoding, "PCMU") + await run_bot(websocket, stream_id, call_control_id, outbound_encoding, "PCMU") if __name__ == "__main__": diff --git a/src/pipecat/serializers/telnyx.py b/src/pipecat/serializers/telnyx.py index cbf16a156..61a89c7f0 100644 --- a/src/pipecat/serializers/telnyx.py +++ b/src/pipecat/serializers/telnyx.py @@ -8,6 +8,8 @@ import base64 import json from typing import Optional +import aiohttp +from loguru import logger from pydantic import BaseModel from pipecat.audio.utils import ( @@ -19,6 +21,8 @@ from pipecat.audio.utils import ( ) from pipecat.frames.frames import ( AudioRawFrame, + CancelFrame, + EndFrame, Frame, InputAudioRawFrame, InputDTMFFrame, @@ -30,38 +34,120 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer class TelnyxFrameSerializer(FrameSerializer): + """Serializer for Telnyx WebSocket protocol. + + This serializer handles converting between Pipecat frames and Telnyx's WebSocket + media streams protocol. It supports audio conversion, DTMF events, and automatic + call termination. + + When auto_hang_up is enabled (default), the serializer will automatically terminate + the Telnyx call when an EndFrame or CancelFrame is processed, but requires Telnyx + credentials to be provided. + + Attributes: + _stream_id: The Telnyx Stream ID. + _call_control_id: The associated Telnyx Call Control ID. + _api_key: Telnyx API key for API access. + _params: Configuration parameters. + _telnyx_sample_rate: Sample rate used by Telnyx (typically 8kHz). + _sample_rate: Input sample rate for the pipeline. + _resampler: Audio resampler for format conversion. + _hangup_attempted: Flag to track if hang-up has been attempted. + """ + class InputParams(BaseModel): - telnyx_sample_rate: int = 8000 # Default Telnyx rate (8kHz) - sample_rate: Optional[int] = None # Pipeline input rate + """Configuration parameters for TelnyxFrameSerializer. + + Attributes: + telnyx_sample_rate: Sample rate used by Telnyx, defaults to 8000 Hz. + sample_rate: Optional override for pipeline input sample rate. + inbound_encoding: Audio encoding for data sent to Telnyx (e.g., "PCMU"). + outbound_encoding: Audio encoding for data received from Telnyx (e.g., "PCMU"). + auto_hang_up: Whether to automatically terminate call on EndFrame. + """ + + telnyx_sample_rate: int = 8000 + sample_rate: Optional[int] = None inbound_encoding: str = "PCMU" outbound_encoding: str = "PCMU" + auto_hang_up: bool = True def __init__( self, stream_id: str, outbound_encoding: str, inbound_encoding: str, + call_control_id: Optional[str] = None, + api_key: Optional[str] = None, params: InputParams = InputParams(), ): + """Initialize the TelnyxFrameSerializer. + + Args: + stream_id: The Stream ID for Telnyx. + outbound_encoding: The encoding type for outbound audio (e.g., "PCMU"). + inbound_encoding: The encoding type for inbound audio (e.g., "PCMU"). + call_control_id: The Call Control ID for the Telnyx call (optional, but required for auto hang-up). + api_key: Your Telnyx API key (required for auto hang-up). + params: Configuration parameters. + """ self._stream_id = stream_id params.outbound_encoding = outbound_encoding params.inbound_encoding = inbound_encoding + self._call_control_id = call_control_id + self._api_key = api_key self._params = params self._telnyx_sample_rate = self._params.telnyx_sample_rate self._sample_rate = 0 # Pipeline input rate self._resampler = create_default_resampler() + self._hangup_attempted = False @property def type(self) -> FrameSerializerType: + """Gets the serializer type. + + Returns: + The serializer type, either TEXT or BINARY. + """ return FrameSerializerType.TEXT async def setup(self, frame: StartFrame): + """Sets up the serializer with pipeline configuration. + + Args: + frame: The StartFrame containing pipeline configuration. + """ self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate async def serialize(self, frame: Frame) -> str | bytes | None: - if isinstance(frame, AudioRawFrame): + """Serializes a Pipecat frame to Telnyx WebSocket format. + + Handles conversion of various frame types to Telnyx WebSocket messages. + For EndFrames and CancelFrames, initiates call termination if auto_hang_up is enabled. + + Args: + frame: The Pipecat frame to serialize. + + Returns: + Serialized data as string or bytes, or None if the frame isn't handled. + + Raises: + ValueError: If an unsupported encoding is specified. + """ + if ( + self._params.auto_hang_up + and not self._hangup_attempted + and isinstance(frame, (EndFrame, CancelFrame)) + ): + self._hangup_attempted = True + await self._hang_up_call() + return None + elif isinstance(frame, StartInterruptionFrame): + answer = {"event": "clear"} + return json.dumps(answer) + elif isinstance(frame, AudioRawFrame): data = frame.audio # Output: Convert PCM at frame's rate to 8kHz encoded for Telnyx @@ -84,11 +170,58 @@ class TelnyxFrameSerializer(FrameSerializer): return json.dumps(answer) - if isinstance(frame, StartInterruptionFrame): - answer = {"event": "clear"} - return json.dumps(answer) + # Return None for unhandled frames + return None + + async def _hang_up_call(self): + """Hang up the Telnyx call using Telnyx's REST API.""" + try: + call_control_id = self._call_control_id + api_key = self._api_key + + if not call_control_id or not api_key: + logger.warning( + "Cannot hang up Telnyx call: call_control_id and api_key must be provided" + ) + return + + # Telnyx API endpoint for hanging up a call + endpoint = f"https://api.telnyx.com/v2/calls/{call_control_id}/actions/hangup" + + # Set headers with API key + headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} + + # Make the POST request to hang up the call + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, headers=headers) as response: + if response.status == 200: + logger.info(f"Successfully terminated Telnyx call {call_control_id}") + else: + # Get the error details for better debugging + error_text = await response.text() + logger.error( + f"Failed to terminate Telnyx call {call_control_id}: " + f"Status {response.status}, Response: {error_text}" + ) + + except Exception as e: + logger.exception(f"Failed to hang up Telnyx call: {e}") async def deserialize(self, data: str | bytes) -> Frame | None: + """Deserializes Telnyx WebSocket data to Pipecat frames. + + Handles conversion of Telnyx media events to appropriate Pipecat frames, + including audio data and DTMF keypresses. + + Args: + data: The raw WebSocket data from Telnyx. + + Returns: + A Pipecat frame corresponding to the Telnyx event, or None if unhandled. + + Raises: + ValueError: If an unsupported encoding is specified. + """ message = json.loads(data) if message["event"] == "media": diff --git a/src/pipecat/serializers/twilio.py b/src/pipecat/serializers/twilio.py index f1fc34c02..b169955ce 100644 --- a/src/pipecat/serializers/twilio.py +++ b/src/pipecat/serializers/twilio.py @@ -66,7 +66,7 @@ class TwilioFrameSerializer(FrameSerializer): def __init__( self, stream_sid: str, - call_sid: str, + call_sid: Optional[str] = None, account_sid: Optional[str] = None, auth_token: Optional[str] = None, params: InputParams = InputParams(), @@ -75,9 +75,9 @@ class TwilioFrameSerializer(FrameSerializer): Args: stream_sid: The Twilio Media Stream SID. - call_sid: The associated Twilio Call SID. - account_sid: Twilio account SID. - auth_token: Twilio auth token. + call_sid: The associated Twilio Call SID (optional, but required for auto hang-up). + account_sid: Twilio account SID (required for auto hang-up). + auth_token: Twilio auth token (required for auto hang-up). params: Configuration parameters. """ self._stream_sid = stream_sid @@ -90,6 +90,7 @@ class TwilioFrameSerializer(FrameSerializer): self._sample_rate = 0 # Pipeline input rate self._resampler = create_default_resampler() + self._hangup_attempted = False @property def type(self) -> FrameSerializerType: @@ -120,7 +121,12 @@ class TwilioFrameSerializer(FrameSerializer): Returns: Serialized data as string or bytes, or None if the frame isn't handled. """ - if self._params.auto_hang_up and isinstance(frame, (EndFrame, CancelFrame)): + if ( + self._params.auto_hang_up + and not self._hangup_attempted + and isinstance(frame, (EndFrame, CancelFrame)) + ): + self._hangup_attempted = True await self._hang_up_call() return None elif isinstance(frame, StartInterruptionFrame): @@ -154,15 +160,26 @@ class TwilioFrameSerializer(FrameSerializer): account_sid = self._account_sid auth_token = self._auth_token + call_sid = self._call_sid + + if not call_sid or not account_sid or not auth_token: + missing = [] + if not call_sid: + missing.append("call_sid") + if not account_sid: + missing.append("account_sid") + if not auth_token: + missing.append("auth_token") - if not account_sid or not auth_token: logger.warning( - "Cannot hang up Twilio call: account_sid and auth_token must be provided" + f"Cannot hang up Twilio call: missing required parameters: {', '.join(missing)}" ) return # Twilio API endpoint for updating calls - endpoint = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{self._call_sid}.json" + endpoint = ( + f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json" + ) # Create basic auth from account_sid and auth_token auth = aiohttp.BasicAuth(account_sid, auth_token) @@ -174,12 +191,12 @@ class TwilioFrameSerializer(FrameSerializer): async with aiohttp.ClientSession() as session: async with session.post(endpoint, auth=auth, data=params) as response: if response.status == 200: - logger.info(f"Successfully terminated Twilio call {self._call_sid}") + logger.info(f"Successfully terminated Twilio call {call_sid}") else: # Get the error details for better debugging error_text = await response.text() logger.error( - f"Failed to terminate Twilio call {self._call_sid}: " + f"Failed to terminate Twilio call {call_sid}: " f"Status {response.status}, Response: {error_text}" )