diff --git a/src/pipecat/serializers/genesys.py b/src/pipecat/serializers/genesys.py index 31e32f4dc..d5d37d12b 100644 --- a/src/pipecat/serializers/genesys.py +++ b/src/pipecat/serializers/genesys.py @@ -9,14 +9,14 @@ Features: - Input/output variables for Architect flow integration - DTMF event support - Barge-in (interruption) events -- Pause/resume support for hold scenarios +- Pause/resume support for hold scenarios (optional) Protocol Reference: - https://developer.genesys.cloud/devapps/audiohook Audio Format: - PCMU (μ-law) at 8kHz sample rate (preferred) -- L16 (16-bit linear PCM) at 8kHz also supported +- L16 (16-bit linear PCM) at 8kHz also supported - Mono (external channel) or Stereo (external on left, internal on right) """ @@ -49,6 +49,7 @@ from pipecat.serializers.base_serializer import FrameSerializer class AudioHookMessageType(str, Enum): """AudioHook protocol message types.""" + OPEN = "open" OPENED = "opened" CLOSE = "close" @@ -65,15 +66,17 @@ class AudioHookMessageType(str, Enum): class AudioHookChannel(str, Enum): """AudioHook audio channel configuration.""" + EXTERNAL = "external" # Customer audio only (mono) INTERNAL = "internal" # Agent audio only (mono) - BOTH = "both" # Stereo: external=left, internal=right + BOTH = "both" # Stereo: external=left, internal=right class AudioHookMediaFormat(str, Enum): """Supported audio formats.""" + PCMU = "PCMU" # μ-law, 8kHz - L16 = "L16" # 16-bit linear PCM, 8kHz + L16 = "L16" # 16-bit linear PCM, 8kHz class GenesysAudioHookSerializer(FrameSerializer): @@ -81,7 +84,7 @@ class GenesysAudioHookSerializer(FrameSerializer): This serializer handles converting between Pipecat frames and Genesys AudioHook protocol messages. It supports: - + - Bidirectional audio streaming (PCMU at 8kHz) - Automatic protocol handshake (open/opened, close/closed, ping/pong) - Session lifecycle management with pause/resume support @@ -102,7 +105,7 @@ class GenesysAudioHookSerializer(FrameSerializer): selected_language="en-US", ) ) - + # Use with FastAPI WebSocket transport transport = FastAPIWebsocketTransport( websocket=websocket, @@ -113,11 +116,11 @@ class GenesysAudioHookSerializer(FrameSerializer): audio_out_fixed_packet_size=1600, # Important: prevents 429 rate limiting from Genesys ), ) - + # Access call information after connection participant = serializer.participant # ani, dnis, etc. input_vars = serializer.input_variables # Custom vars from Architect - + # Set output variables to return to Architect serializer.set_output_variables({"intent": "billing", "resolved": True}) ``` @@ -166,23 +169,23 @@ class GenesysAudioHookSerializer(FrameSerializer): """ super().__init__(**kwargs) self._params = params or GenesysAudioHookSerializer.InputParams() - + self._genesys_sample_rate = self._params.genesys_sample_rate self._sample_rate = 0 # Pipeline input rate, set in setup() self._session_id = str(uuid.uuid4()) - + # Use Pipecat's official resampler if needed (SOXR) # Only used for TTS output (16kHz → 8kHz), input goes without resampling self._input_resampler = SOXRStreamAudioResampler() self._output_resampler = SOXRStreamAudioResampler() - + # Protocol state self._client_seq = 0 self._server_seq = 0 self._is_open = False self._is_paused = False self._position = timedelta(0) - + # Session metadata self._conversation_id: Optional[str] = None self._participant: Optional[Dict[str, Any]] = None @@ -200,7 +203,6 @@ class GenesysAudioHookSerializer(FrameSerializer): self._register_event_handler("on_error") self._register_event_handler("on_dtmf") - @property def session_id(self) -> str: """Get the Genesys AudioHook session ID generated by the serializer.""" @@ -238,13 +240,13 @@ class GenesysAudioHookSerializer(FrameSerializer): def set_output_variables(self, variables: Dict[str, Any]) -> None: """Set custom output variables to send back to Genesys on close. - + These variables will be included in the 'closed' response when Genesys closes the connection, making them available in the Architect flow. - + Args: variables: Dictionary of custom variables to send to Genesys. - + Example: ```python # During the conversation, collect data and set it @@ -267,13 +269,13 @@ class GenesysAudioHookSerializer(FrameSerializer): """ self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate logger.debug(f"GenesysAudioHookSerializer setup with sample_rate={self._sample_rate}") - + def _format_position(self, position: timedelta) -> str: """Format a timedelta as ISO 8601 duration string. - + Args: position: The timedelta to format. - + Returns: ISO 8601 duration string (e.g., "PT1.5S"). """ @@ -282,10 +284,10 @@ class GenesysAudioHookSerializer(FrameSerializer): def _parse_position(self, position_str: str) -> timedelta: """Parse an ISO 8601 duration string to timedelta. - + Args: position_str: ISO 8601 duration string (e.g., "PT1.5S"). - + Returns: Corresponding timedelta. """ @@ -310,16 +312,16 @@ class GenesysAudioHookSerializer(FrameSerializer): include_position: bool = True, ) -> Dict[str, Any]: """Create a protocol message with common fields. - + Based on the Genesys AudioHook protocol, responses include: - seq: Server's sequence number (incremented per message) - clientseq: Echo of the client's last sequence number - + Args: msg_type: The message type. parameters: Optional parameters object. include_position: Whether to include position field. - + Returns: The message dictionary. """ @@ -331,13 +333,13 @@ class GenesysAudioHookSerializer(FrameSerializer): "clientseq": self._client_seq, "id": self._session_id, } - + if include_position: msg["position"] = self._format_position(self._position) - + if parameters: msg["parameters"] = parameters - + return msg def create_opened_response( @@ -347,27 +349,27 @@ class GenesysAudioHookSerializer(FrameSerializer): selected_language: Optional[str] = None, ) -> Dict[str, Any]: """Create an 'opened' response message for the client. - + This should be sent in response to an 'open' message from Genesys. - + Args: start_paused: Whether to start the session paused. supported_languages: List of supported language codes. selected_language: The selected language code. - + Returns: Dictionary of the opened response message. """ # Build channels list based on configuration channels: list[str] = [] - + if self._params.channel == AudioHookChannel.EXTERNAL: channels = ["external"] elif self._params.channel == AudioHookChannel.INTERNAL: channels = ["internal"] elif self._params.channel == AudioHookChannel.BOTH: channels = ["external", "internal"] - + parameters = { "startPaused": start_paused, "media": [ @@ -379,22 +381,22 @@ class GenesysAudioHookSerializer(FrameSerializer): } ], } - + if supported_languages: parameters["supportedLanguages"] = supported_languages if selected_language: parameters["selectedLanguage"] = selected_language - + msg = self._create_message( AudioHookMessageType.OPENED, parameters=parameters, include_position=False, # opened doesn't need position ) - + self._is_open = True logger.debug(f"AudioHook session opened: {self._session_id}") - + return msg def create_closed_response( @@ -402,17 +404,17 @@ class GenesysAudioHookSerializer(FrameSerializer): output_variables: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Create a 'closed' response message. - + This should be sent in response to a 'close' message from Genesys. - + Args: output_variables: Optional custom variables to pass back to Genesys. These will be available in the Architect flow after the AudioHook action completes. - + Returns: Dictionary of the closed response message. - + Example: ```python # Pass custom data back to Genesys @@ -426,25 +428,25 @@ class GenesysAudioHookSerializer(FrameSerializer): ``` """ parameters: Optional[Dict[str, Any]] = None - + if output_variables: parameters = {"outputVariables": output_variables} - + msg = self._create_message( AudioHookMessageType.CLOSED, parameters=parameters, ) - + self._is_open = False logger.debug(f"AudioHook session closed: {self._session_id}") - + return msg def create_pong_response(self) -> Dict[str, Any]: """Create a 'pong' response message. - + This should be sent in response to a 'ping' message from Genesys. - + Returns: Dictionary of the pong response message. """ @@ -453,39 +455,35 @@ class GenesysAudioHookSerializer(FrameSerializer): def create_resumed_response(self) -> Dict[str, Any]: """Create a 'resumed' response message. - + This should be sent in response to a 'pause' message when ready to resume. - + Returns: Dictionary of the resumed response message. """ msg = self._create_message(AudioHookMessageType.RESUMED) - + self._is_paused = False logger.debug(f"AudioHook session resumed: {self._session_id}") - + return msg def create_barge_in_event(self) -> Dict[str, Any]: """Create a barge-in event message. - + This notifies Genesys Cloud that the user has interrupted the bot's audio output. Genesys will stop any queued audio playback. - + Returns: Dictionary of the barge-in event message. """ msg = self._create_message( AudioHookMessageType.EVENT, - parameters={ - "entities": [ - {"type": "barge_in", "data": {}} - ] - }, + parameters={"entities": [{"type": "barge_in", "data": {}}]}, ) - + logger.debug("🔇 Barge-in event sent to Genesys") - + return msg def create_disconnect_message( @@ -496,32 +494,32 @@ class GenesysAudioHookSerializer(FrameSerializer): info: Optional[str] = None, ) -> Dict[str, Any]: """Create a 'disconnect' message to initiate session termination. - + Args: reason: Disconnect reason (e.g., "completed", "error"). action: Action to take ("transfer" to agent, "finished" if completed). output_variables: Custom output variables to pass back to Genesys. info: Optional additional information. - + Returns: Dictionary of the disconnect message. """ parameters: Dict[str, Any] = {"reason": reason} - + # Build outputVariables out_vars = {"action": action} if output_variables: out_vars.update(output_variables) parameters["outputVariables"] = out_vars - + if info: parameters["info"] = info - + msg = self._create_message( AudioHookMessageType.DISCONNECT, parameters=parameters, ) - + logger.debug(f"AudioHook disconnect: reason={reason}, action={action}") return msg @@ -532,12 +530,12 @@ class GenesysAudioHookSerializer(FrameSerializer): retryable: bool = False, ) -> Dict[str, Any]: """Create an 'error' message. - + Args: code: Error code. message: Error message. retryable: Whether the operation can be retried. - + Returns: Dictionary of the error message. """ @@ -546,12 +544,12 @@ class GenesysAudioHookSerializer(FrameSerializer): "message": message, "retryable": retryable, } - + msg = self._create_message( AudioHookMessageType.ERROR, parameters=parameters, ) - + logger.error(f"AudioHook error: {code} - {message}") return msg @@ -572,14 +570,18 @@ class GenesysAudioHookSerializer(FrameSerializer): the frame type is not handled or session is not open. """ if isinstance(frame, (EndFrame, CancelFrame)): - return json.dumps(self.create_disconnect_message(output_variables=self.output_variables, reason="completed")) - + return json.dumps( + self.create_disconnect_message( + output_variables=self.output_variables, reason="completed" + ) + ) + elif isinstance(frame, AudioRawFrame): if not self._is_open or self._is_paused: return None - + data = frame.audio - + # Convert PCM to μ-law at 8kHz for Genesys if self._params.media_format == AudioHookMediaFormat.PCMU: serialized_data = await pcm_to_ulaw( @@ -592,15 +594,15 @@ class GenesysAudioHookSerializer(FrameSerializer): # L16 format - just resample if needed logger.warning("L16 format not yet fully implemented") return None - + if serialized_data is None or len(serialized_data) == 0: return None - + return bytes(serialized_data) elif isinstance(frame, InterruptionFrame): return json.dumps(self.create_barge_in_event()) - + elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): # Only pass through AudioHook protocol messages (those with "version" field) # Filter out RTVI and other non-AudioHook messages @@ -609,7 +611,7 @@ class GenesysAudioHookSerializer(FrameSerializer): else: # Not an AudioHook message, ignore return None - + # Ignore other frames - we don't need to process them here return None @@ -639,39 +641,41 @@ class GenesysAudioHookSerializer(FrameSerializer): if isinstance(data, bytes): logger.debug(f"[AUDIO IN] Received {len(data)} bytes from Genesys") return await self._deserialize_audio(data) - + # Text data = JSON control message try: message = json.loads(data) except json.JSONDecodeError as e: logger.error(f"Failed to parse AudioHook message: {e}") return None - + return await self._handle_control_message(message) async def _deserialize_audio(self, data: bytes) -> Frame | None: """Deserialize binary audio data to an InputAudioRawFrame. - + Args: data: Raw audio bytes (PCMU or L16). - + Returns: InputAudioRawFrame with PCM audio at pipeline sample rate. """ if not self._is_open or self._is_paused: return None - + audio_data = data original_len = len(data) - + # If Genesys sends stereo audio (BOTH channels), extract only the external channel (left) # Stereo audio comes interleaved: [L0, R0, L1, R1, ...] if self._params.channel == AudioHookChannel.BOTH and len(data) > 0: # For PCMU, each sample is 1 byte # Extract only bytes at even positions (left channel = external) audio_data = bytes(data[i] for i in range(0, len(data), 2)) - logger.debug(f"🔊 Stereo audio: {original_len} bytes → {len(audio_data)} bytes (external channel)") - + logger.debug( + f"🔊 Stereo audio: {original_len} bytes → {len(audio_data)} bytes (external channel)" + ) + if self._params.media_format == AudioHookMediaFormat.PCMU: # Convert μ-law at 8kHz to PCM at pipeline rate deserialized_data = await ulaw_to_pcm( @@ -684,63 +688,62 @@ class GenesysAudioHookSerializer(FrameSerializer): # L16 format logger.warning("L16 format not yet fully implemented") return None - + if deserialized_data is None or len(deserialized_data) == 0: return None - + # Always use mono for STT - ElevenLabs expects single channel num_channels = 1 - + audio_frame = InputAudioRawFrame( audio=deserialized_data, num_channels=num_channels, sample_rate=self._sample_rate, ) - + return audio_frame - async def _handle_control_message(self, message: Dict[str, Any]) -> Frame | None: """Handle a JSON control message from Genesys. - + Args: message: Parsed JSON message. - + Returns: Frame if the message should be passed to the pipeline, None otherwise. """ msg_type = message.get("type", "") self._client_seq = message.get("seq", 0) - + # Update position if provided if "position" in message: self._position = self._parse_position(message["position"]) - + if msg_type == AudioHookMessageType.OPEN.value: return await self._handle_open(message) - + elif msg_type == AudioHookMessageType.CLOSE.value: return await self._handle_close(message) - + elif msg_type == AudioHookMessageType.PING.value: return await self._handle_ping(message) - + elif msg_type == AudioHookMessageType.PAUSE.value: return await self._handle_pause(message) - + elif msg_type == AudioHookMessageType.UPDATE.value: return await self._handle_update(message) elif msg_type == AudioHookMessageType.ERROR.value: return await self._handle_error(message) - + elif msg_type == "dtmf": return await self._handle_dtmf(message) - + elif msg_type == "playback_started": logger.debug("Playback started (from Genesys)") return None - + elif msg_type == "playback_completed": logger.debug("Playback completed (from Genesys)") return None @@ -750,40 +753,42 @@ class GenesysAudioHookSerializer(FrameSerializer): async def _handle_open(self, message: Dict[str, Any]) -> Frame | None: """Handle an 'open' message from Genesys. - + This initializes the session with metadata from Genesys Cloud and automatically responds with an 'opened' message using the configured InputParams (supported_languages, selected_language, start_paused). - + Extracts and stores: - session_id: The AudioHook session identifier - conversation_id: The Genesys conversation ID - participant: Caller info (ani, dnis, etc.) - input_variables: Custom variables from Architect flow - media_info: Audio configuration from Genesys - + Args: message: The open message from Genesys. - + Returns: OutputTransportMessageUrgentFrame with the 'opened' response. """ self._session_id = message.get("id", str(uuid.uuid4())) - + params = message.get("parameters", {}) self._conversation_id = params.get("conversationId") self._participant = params.get("participant") self._custom_config = params.get("customConfig") self._media_info = params.get("media") # This is a list of media objects self._input_variables = params.get("inputVariables") # Custom vars from Genesys - + # Extract media configuration if present # media is a list like: [{"type": "audio", "format": "PCMU", "channels": ["external"], "rate": 8000}] media_list = self._media_info if media_list and isinstance(media_list, list) and len(media_list) > 0: audio_media: Dict[str, Any] = media_list[0] # Get first media entry channels = audio_media.get("channels", []) - logger.debug(f"📡 Genesys audio config: format={audio_media.get('format')}, channels={channels}, rate={audio_media.get('rate')}") + logger.debug( + f"📡 Genesys audio config: format={audio_media.get('format')}, channels={channels}, rate={audio_media.get('rate')}" + ) # channels is a list like ["external"] or ["external", "internal"] if isinstance(channels, list): if "external" in channels and "internal" in channels: @@ -795,47 +800,49 @@ class GenesysAudioHookSerializer(FrameSerializer): elif "internal" in channels: self._params.channel = AudioHookChannel.INTERNAL logger.debug("📡 Mono mode: internal channel") - + # Log participant info for debugging ani = self._participant.get("ani", "unknown") if self._participant else "unknown" logger.info( f"AudioHook open request: session={self._session_id}, " f"conversation={self._conversation_id}, ani={ani}" ) - + await self._call_event_handler("on_open", message) - return OutputTransportMessageUrgentFrame(message=self.create_opened_response( - start_paused=self._params.start_paused, - supported_languages=self._params.supported_languages, - selected_language=self._params.selected_language - )) + return OutputTransportMessageUrgentFrame( + message=self.create_opened_response( + start_paused=self._params.start_paused, + supported_languages=self._params.supported_languages, + selected_language=self._params.selected_language, + ) + ) async def _handle_close(self, message: Dict[str, Any]) -> Frame | None: """Handle a 'close' message from Genesys. - + Automatically responds with a 'closed' message. If output_variables were set via set_output_variables(), they will be included in the response and made available in the Architect flow. - + Args: message: The close message from Genesys. - + Returns: OutputTransportMessageUrgentFrame with the closed response (includes outputVariables if set). """ params = message.get("parameters", {}) reason = params.get("reason", "unknown") - + logger.info(f"🔴 Genesys closed the connection: {reason}") - + self._is_open = False logger.info(f"Sending closed response to Genesys...") await self._call_event_handler("on_close", message) - + # Return as urgent frame to be sent through pipeline immediately # Include any output variables that were set during the session return OutputTransportMessageUrgentFrame( @@ -844,12 +851,12 @@ class GenesysAudioHookSerializer(FrameSerializer): async def _handle_ping(self, message: Dict[str, Any]) -> Frame | None: """Handle a 'ping' message from Genesys. - + Automatically responds with a 'pong' message to maintain the connection. - + Args: message: The ping message from Genesys. - + Returns: OutputTransportMessageUrgentFrame with pong response. """ @@ -862,92 +869,92 @@ class GenesysAudioHookSerializer(FrameSerializer): async def _handle_pause(self, message: Dict[str, Any]) -> Frame | None: """Handle a 'pause' message from Genesys. - + This is used when audio streaming is temporarily suspended (e.g., during hold). - + Args: message: The pause message. - + Returns: None (response should be sent via create_resumed_response()). """ params = message.get("parameters", {}) reason = params.get("reason", "unknown") - + logger.info(f"AudioHook pause request: reason={reason}") - + self._is_paused = True await self._call_event_handler("on_pause", message) - + # Note: Application should call create_resumed_response() when ready return None async def _handle_update(self, message: Dict[str, Any]) -> Frame | None: """Handle an 'update' message from Genesys. - + Updates may include changes to participants or configuration. - + Args: message: The update message. - + Returns: None. """ params = message.get("parameters", {}) - + if "participant" in params: self._participant = params["participant"] - + logger.debug(f"AudioHook update received: {params}") await self._call_event_handler("on_update", message) - + return None async def _handle_error(self, message: Dict[str, Any]) -> Frame | None: """Handle an 'error' message from Genesys. - + Args: message: The error message. - + Returns: None. """ params = message.get("parameters", {}) code = params.get("code", 0) error_msg = params.get("message", "Unknown error") - + logger.error(f"AudioHook error from Genesys: {code} - {error_msg}") await self._call_event_handler("on_error", message) - + return None async def _handle_dtmf(self, message: Dict[str, Any]) -> Frame | None: """Handle a 'dtmf' message from Genesys. - + DTMF (Dual-Tone Multi-Frequency) events are sent when the user presses keys on their phone keypad. - + Args: message: The DTMF message. - + Returns: InputDTMFFrame with the pressed digit. """ params = message.get("parameters", {}) digit = params.get("digit", "") - + if not digit: logger.warning("DTMF message received without digit") return None - + logger.info(f"DTMF received: {digit}") await self._call_event_handler("on_dtmf", message) - + try: return InputDTMFFrame(KeypadEntry(digit)) except ValueError: