Merge pull request #1645 from pipecat-ai/mb/telnyx-auto-hang-up
Add auto_hang_up to Telnyx serializer
This commit is contained in:
18
CHANGELOG.md
18
CHANGELOG.md
@@ -5,6 +5,24 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added automatic hangup logic to the Telnyx serializer. This feature hangs up
|
||||
the Telnyx call when an `EndFrame` or `CancelFrame` is received. It is
|
||||
enabled by default and is configurable via the `auto_hang_up` `InputParam`.
|
||||
|
||||
### Changed
|
||||
|
||||
- In `TwilioFrameSerializer`, `call_sid` is Optional so as to avoid a breaking
|
||||
changed. `call_sid` is required to automatically hang up.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where `TwilioFrameSerializer` would send two hang up commands:
|
||||
one for the `EndFrame` and one for the `CancelFrame`.
|
||||
|
||||
## [0.0.64] - 2025-04-22
|
||||
|
||||
### Added
|
||||
|
||||
@@ -63,20 +63,35 @@ This project is a FastAPI-based chatbot that integrates with Telnyx to handle We
|
||||
ngrok http 8765
|
||||
```
|
||||
|
||||
2. **Update the Telnyx TeXML applications Webhook**:
|
||||
2. **Purchase a number**
|
||||
|
||||
- Go to your TeXML configuration page
|
||||
- Provide the ngrok URL to the Webhook URL field and ensure the POST method is selected
|
||||
- Click Save at the bottom of the page
|
||||
If you haven't already, purchase a number from Telnyx.
|
||||
|
||||
3. **Configure streams.xml**:
|
||||
- Log in to the Telnyx developer portal: https://portal.telnyx.com/
|
||||
- Buy a number: https://portal.telnyx.com/#/numbers/buy-numbers
|
||||
|
||||
3. **Update the Telnyx TeXML applications Webhook**:
|
||||
|
||||
- Go to your TeXML configuration page: https://portal.telnyx.com/#/call-control/texml
|
||||
- Create a new TeXML app, if one doesn't exist already:
|
||||
- Add an application name
|
||||
- Under Webhooks, select POST as the "Voice Method"
|
||||
- Select "Custom URL" under Webhook URL Method
|
||||
- Enter your ngrok URL in the "Webhook URL" field (e.g. https://your-name.ngrok.io)
|
||||
- Click "Create" to save
|
||||
Note: You'll see subsequent pages to set up SIP and Outbound, both are not required, so just skip.
|
||||
- Navigate to "Manage Numbers" (https://portal.telnyx.com/#/numbers/my-numbers) and under SIP connection, select the pencil icon to edit and select the TeXML application that you just created.
|
||||
|
||||
Now your number is ready to call.
|
||||
|
||||
4. **Configure streams.xml**:
|
||||
- Copy the template file to create your local version:
|
||||
```sh
|
||||
cp templates/streams.xml.template templates/streams.xml
|
||||
```
|
||||
- In `templates/streams.xml`, replace `<your server url>` with your ngrok URL (without `https://`)
|
||||
- The final URL should look like: `wss://abc123.ngrok.io/ws`
|
||||
- The encoding (`bidirectionalCodec`) should be `PCMU` or `PCMA` depending on your needs. Based on selected encoding, set the outbound_encoding in `server.py` when the bot is initialized.
|
||||
- The final URL should look like: `wss://abc123.ngrok.io/ws`. This needs to be the same URL that you added to your TeXML app above.
|
||||
- The encoding (`bidirectionalCodec`) should be `PCMU` or `PCMA` depending on your needs. Based on selected encoding, set the outbound_encoding in `server.py` when the bot is initialized. (No changes are required by default.)
|
||||
- The inbound encoding can be controlled from the application configuration for inbound calls and dial/transfer commands for outbound calls.
|
||||
|
||||
## Running the Application
|
||||
|
||||
@@ -33,9 +33,18 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
async def run_bot(
|
||||
websocket_client,
|
||||
stream_id: str,
|
||||
call_control_id: str,
|
||||
outbound_encoding: str,
|
||||
inbound_encoding: str,
|
||||
):
|
||||
serializer = TelnyxFrameSerializer(
|
||||
stream_id=stream_id,
|
||||
outbound_encoding=outbound_encoding,
|
||||
inbound_encoding=inbound_encoding,
|
||||
call_control_id=call_control_id,
|
||||
api_key=os.getenv("TELNYX_API_KEY"),
|
||||
)
|
||||
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
@@ -44,7 +53,7 @@ async def run_bot(
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
serializer=TelnyxFrameSerializer(stream_id, outbound_encoding, inbound_encoding),
|
||||
serializer=serializer,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -37,9 +37,10 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
call_data = json.loads(await start_data.__anext__())
|
||||
print(call_data, flush=True)
|
||||
stream_id = call_data["stream_id"]
|
||||
call_control_id = call_data["start"]["call_control_id"]
|
||||
outbound_encoding = call_data["start"]["media_format"]["encoding"]
|
||||
print("WebSocket connection accepted")
|
||||
await run_bot(websocket, stream_id, outbound_encoding, "PCMU")
|
||||
await run_bot(websocket, stream_id, call_control_id, outbound_encoding, "PCMU")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -8,6 +8,8 @@ import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import (
|
||||
@@ -19,6 +21,8 @@ from pipecat.audio.utils import (
|
||||
)
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
@@ -30,38 +34,120 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
|
||||
|
||||
class TelnyxFrameSerializer(FrameSerializer):
|
||||
"""Serializer for Telnyx WebSocket protocol.
|
||||
|
||||
This serializer handles converting between Pipecat frames and Telnyx's WebSocket
|
||||
media streams protocol. It supports audio conversion, DTMF events, and automatic
|
||||
call termination.
|
||||
|
||||
When auto_hang_up is enabled (default), the serializer will automatically terminate
|
||||
the Telnyx call when an EndFrame or CancelFrame is processed, but requires Telnyx
|
||||
credentials to be provided.
|
||||
|
||||
Attributes:
|
||||
_stream_id: The Telnyx Stream ID.
|
||||
_call_control_id: The associated Telnyx Call Control ID.
|
||||
_api_key: Telnyx API key for API access.
|
||||
_params: Configuration parameters.
|
||||
_telnyx_sample_rate: Sample rate used by Telnyx (typically 8kHz).
|
||||
_sample_rate: Input sample rate for the pipeline.
|
||||
_resampler: Audio resampler for format conversion.
|
||||
_hangup_attempted: Flag to track if hang-up has been attempted.
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
telnyx_sample_rate: int = 8000 # Default Telnyx rate (8kHz)
|
||||
sample_rate: Optional[int] = None # Pipeline input rate
|
||||
"""Configuration parameters for TelnyxFrameSerializer.
|
||||
|
||||
Attributes:
|
||||
telnyx_sample_rate: Sample rate used by Telnyx, defaults to 8000 Hz.
|
||||
sample_rate: Optional override for pipeline input sample rate.
|
||||
inbound_encoding: Audio encoding for data sent to Telnyx (e.g., "PCMU").
|
||||
outbound_encoding: Audio encoding for data received from Telnyx (e.g., "PCMU").
|
||||
auto_hang_up: Whether to automatically terminate call on EndFrame.
|
||||
"""
|
||||
|
||||
telnyx_sample_rate: int = 8000
|
||||
sample_rate: Optional[int] = None
|
||||
inbound_encoding: str = "PCMU"
|
||||
outbound_encoding: str = "PCMU"
|
||||
auto_hang_up: bool = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream_id: str,
|
||||
outbound_encoding: str,
|
||||
inbound_encoding: str,
|
||||
call_control_id: Optional[str] = None,
|
||||
api_key: Optional[str] = None,
|
||||
params: InputParams = InputParams(),
|
||||
):
|
||||
"""Initialize the TelnyxFrameSerializer.
|
||||
|
||||
Args:
|
||||
stream_id: The Stream ID for Telnyx.
|
||||
outbound_encoding: The encoding type for outbound audio (e.g., "PCMU").
|
||||
inbound_encoding: The encoding type for inbound audio (e.g., "PCMU").
|
||||
call_control_id: The Call Control ID for the Telnyx call (optional, but required for auto hang-up).
|
||||
api_key: Your Telnyx API key (required for auto hang-up).
|
||||
params: Configuration parameters.
|
||||
"""
|
||||
self._stream_id = stream_id
|
||||
params.outbound_encoding = outbound_encoding
|
||||
params.inbound_encoding = inbound_encoding
|
||||
self._call_control_id = call_control_id
|
||||
self._api_key = api_key
|
||||
self._params = params
|
||||
|
||||
self._telnyx_sample_rate = self._params.telnyx_sample_rate
|
||||
self._sample_rate = 0 # Pipeline input rate
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
self._hangup_attempted = False
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
"""Gets the serializer type.
|
||||
|
||||
Returns:
|
||||
The serializer type, either TEXT or BINARY.
|
||||
"""
|
||||
return FrameSerializerType.TEXT
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
"""Sets up the serializer with pipeline configuration.
|
||||
|
||||
Args:
|
||||
frame: The StartFrame containing pipeline configuration.
|
||||
"""
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
"""Serializes a Pipecat frame to Telnyx WebSocket format.
|
||||
|
||||
Handles conversion of various frame types to Telnyx WebSocket messages.
|
||||
For EndFrames and CancelFrames, initiates call termination if auto_hang_up is enabled.
|
||||
|
||||
Args:
|
||||
frame: The Pipecat frame to serialize.
|
||||
|
||||
Returns:
|
||||
Serialized data as string or bytes, or None if the frame isn't handled.
|
||||
|
||||
Raises:
|
||||
ValueError: If an unsupported encoding is specified.
|
||||
"""
|
||||
if (
|
||||
self._params.auto_hang_up
|
||||
and not self._hangup_attempted
|
||||
and isinstance(frame, (EndFrame, CancelFrame))
|
||||
):
|
||||
self._hangup_attempted = True
|
||||
await self._hang_up_call()
|
||||
return None
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
answer = {"event": "clear"}
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
# Output: Convert PCM at frame's rate to 8kHz encoded for Telnyx
|
||||
@@ -84,11 +170,58 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
|
||||
return json.dumps(answer)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
answer = {"event": "clear"}
|
||||
return json.dumps(answer)
|
||||
# Return None for unhandled frames
|
||||
return None
|
||||
|
||||
async def _hang_up_call(self):
|
||||
"""Hang up the Telnyx call using Telnyx's REST API."""
|
||||
try:
|
||||
call_control_id = self._call_control_id
|
||||
api_key = self._api_key
|
||||
|
||||
if not call_control_id or not api_key:
|
||||
logger.warning(
|
||||
"Cannot hang up Telnyx call: call_control_id and api_key must be provided"
|
||||
)
|
||||
return
|
||||
|
||||
# Telnyx API endpoint for hanging up a call
|
||||
endpoint = f"https://api.telnyx.com/v2/calls/{call_control_id}/actions/hangup"
|
||||
|
||||
# Set headers with API key
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
||||
|
||||
# Make the POST request to hang up the call
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(endpoint, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
logger.info(f"Successfully terminated Telnyx call {call_control_id}")
|
||||
else:
|
||||
# Get the error details for better debugging
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Telnyx call {call_control_id}: "
|
||||
f"Status {response.status}, Response: {error_text}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Telnyx call: {e}")
|
||||
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Deserializes Telnyx WebSocket data to Pipecat frames.
|
||||
|
||||
Handles conversion of Telnyx media events to appropriate Pipecat frames,
|
||||
including audio data and DTMF keypresses.
|
||||
|
||||
Args:
|
||||
data: The raw WebSocket data from Telnyx.
|
||||
|
||||
Returns:
|
||||
A Pipecat frame corresponding to the Telnyx event, or None if unhandled.
|
||||
|
||||
Raises:
|
||||
ValueError: If an unsupported encoding is specified.
|
||||
"""
|
||||
message = json.loads(data)
|
||||
|
||||
if message["event"] == "media":
|
||||
|
||||
@@ -66,7 +66,7 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
def __init__(
|
||||
self,
|
||||
stream_sid: str,
|
||||
call_sid: str,
|
||||
call_sid: Optional[str] = None,
|
||||
account_sid: Optional[str] = None,
|
||||
auth_token: Optional[str] = None,
|
||||
params: InputParams = InputParams(),
|
||||
@@ -75,9 +75,9 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
|
||||
Args:
|
||||
stream_sid: The Twilio Media Stream SID.
|
||||
call_sid: The associated Twilio Call SID.
|
||||
account_sid: Twilio account SID.
|
||||
auth_token: Twilio auth token.
|
||||
call_sid: The associated Twilio Call SID (optional, but required for auto hang-up).
|
||||
account_sid: Twilio account SID (required for auto hang-up).
|
||||
auth_token: Twilio auth token (required for auto hang-up).
|
||||
params: Configuration parameters.
|
||||
"""
|
||||
self._stream_sid = stream_sid
|
||||
@@ -90,6 +90,7 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
self._sample_rate = 0 # Pipeline input rate
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
self._hangup_attempted = False
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
@@ -120,7 +121,12 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
Returns:
|
||||
Serialized data as string or bytes, or None if the frame isn't handled.
|
||||
"""
|
||||
if self._params.auto_hang_up and isinstance(frame, (EndFrame, CancelFrame)):
|
||||
if (
|
||||
self._params.auto_hang_up
|
||||
and not self._hangup_attempted
|
||||
and isinstance(frame, (EndFrame, CancelFrame))
|
||||
):
|
||||
self._hangup_attempted = True
|
||||
await self._hang_up_call()
|
||||
return None
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
@@ -154,15 +160,26 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
|
||||
account_sid = self._account_sid
|
||||
auth_token = self._auth_token
|
||||
call_sid = self._call_sid
|
||||
|
||||
if not call_sid or not account_sid or not auth_token:
|
||||
missing = []
|
||||
if not call_sid:
|
||||
missing.append("call_sid")
|
||||
if not account_sid:
|
||||
missing.append("account_sid")
|
||||
if not auth_token:
|
||||
missing.append("auth_token")
|
||||
|
||||
if not account_sid or not auth_token:
|
||||
logger.warning(
|
||||
"Cannot hang up Twilio call: account_sid and auth_token must be provided"
|
||||
f"Cannot hang up Twilio call: missing required parameters: {', '.join(missing)}"
|
||||
)
|
||||
return
|
||||
|
||||
# Twilio API endpoint for updating calls
|
||||
endpoint = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{self._call_sid}.json"
|
||||
endpoint = (
|
||||
f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
|
||||
)
|
||||
|
||||
# Create basic auth from account_sid and auth_token
|
||||
auth = aiohttp.BasicAuth(account_sid, auth_token)
|
||||
@@ -174,12 +191,12 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(endpoint, auth=auth, data=params) as response:
|
||||
if response.status == 200:
|
||||
logger.info(f"Successfully terminated Twilio call {self._call_sid}")
|
||||
logger.info(f"Successfully terminated Twilio call {call_sid}")
|
||||
else:
|
||||
# Get the error details for better debugging
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Twilio call {self._call_sid}: "
|
||||
f"Failed to terminate Twilio call {call_sid}: "
|
||||
f"Status {response.status}, Response: {error_text}"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user