diff --git a/changelog/3962.fixed.md b/changelog/3962.fixed.md new file mode 100644 index 000000000..1d326cd05 --- /dev/null +++ b/changelog/3962.fixed.md @@ -0,0 +1 @@ +- Fixed `SmallWebRTCConnection` silently discarding messages sent before the data channel is open by queuing them and flushing once the channel is ready. A bounded queue (`MAX_MESSAGE_QUEUE_SIZE = 50`) prevents unbounded memory growth, and a 10-second timeout after connection clears the queue and falls back to discard mode if the data channel never opens. diff --git a/src/pipecat/transports/smallwebrtc/connection.py b/src/pipecat/transports/smallwebrtc/connection.py index b9b6538f1..3f0d6a9ee 100644 --- a/src/pipecat/transports/smallwebrtc/connection.py +++ b/src/pipecat/transports/smallwebrtc/connection.py @@ -41,6 +41,11 @@ AUDIO_TRANSCEIVER_INDEX = 0 VIDEO_TRANSCEIVER_INDEX = 1 SCREEN_VIDEO_TRANSCEIVER_INDEX = 2 +# Maximum number of messages to queue while the data channel is not yet open. +MAX_MESSAGE_QUEUE_SIZE = 50 +# Seconds to wait for the data channel to open after the peer connection is established. +DATA_CHANNEL_TIMEOUT_SECS = 10 + class TrackStatusMessage(BaseModel): """Message for updating track enabled/disabled status. @@ -283,8 +288,11 @@ class SmallWebRTCConnection(BaseObject): self._data_channel = None self._renegotiation_in_progress = False self._last_received_time = None + self._outgoing_messages_queue = [] + self._data_channel_enabled = True self._pending_app_messages = [] self._connecting_timeout_task = None + self._data_channel_timeout_task = None def _setup_listeners(self): """Set up event listeners for the peer connection.""" @@ -297,6 +305,7 @@ class SmallWebRTCConnection(BaseObject): @channel.on("open") async def on_open(): logger.debug("Data channel is open!") + self._flush_message_queue() @channel.on("message") async def on_message(message): @@ -499,9 +508,12 @@ class SmallWebRTCConnection(BaseObject): self._track_map.clear() if self._pc: await self._pc.close() + self._outgoing_messages_queue.clear() + self._data_channel_enabled = True self._pending_app_messages.clear() self._track_map = {} self._cancel_monitoring_connecting_state() + self._cancel_data_channel_timeout() def get_answer(self): """Get the SDP answer for the current connection. @@ -550,6 +562,44 @@ class SmallWebRTCConnection(BaseObject): self._connecting_timeout_task.cancel() self._connecting_timeout_task = None + def _start_data_channel_timeout(self) -> None: + """Start a timeout to detect if the data channel fails to open after connection. + + Schedules a background task that fires ``DATA_CHANNEL_TIMEOUT_SECS`` seconds after + the peer connection reaches the *connected* state. If the data channel has not + opened by then, the queued messages are discarded, a warning is logged, and future + calls to :meth:`send_app_message` will silently drop messages instead of queuing + them (fall-back to "discard" mode). + + The task is automatically cancelled when the data channel opens successfully (see + :meth:`_flush_message_queue`) or when the connection is closed (see + :meth:`_close`). + """ + + async def timeout_handler(): + await asyncio.sleep(DATA_CHANNEL_TIMEOUT_SECS) + if not self._data_channel or self._data_channel.readyState != "open": + logger.warning( + f"Data channel not established within {DATA_CHANNEL_TIMEOUT_SECS}s after " + "connection. Clearing message queue and disabling future queueing." + ) + self._outgoing_messages_queue.clear() + self._data_channel_enabled = False + + self._data_channel_timeout_task = asyncio.create_task(timeout_handler()) + + def _cancel_data_channel_timeout(self) -> None: + """Cancel the data-channel open timeout task, if any. + + Should be called when the data channel opens successfully (the timeout is no longer + needed) or when the connection is being torn down. If the task is still pending it + will be cancelled and the reference cleared. + """ + if self._data_channel_timeout_task and not self._data_channel_timeout_task.done(): + logger.debug("Cancelling the data channel timeout task") + self._data_channel_timeout_task.cancel() + self._data_channel_timeout_task = None + async def _handle_new_connection_state(self): """Handle changes in the peer connection state.""" state = self._pc.connectionState @@ -558,6 +608,9 @@ class SmallWebRTCConnection(BaseObject): else: self._cancel_monitoring_connecting_state() + if state == "connected" and not self._data_channel_timeout_task: + self._start_data_channel_timeout() + if state == "connected" and not self._connect_invoked: # We are going to wait until the pipeline is ready before triggering the event return @@ -657,15 +710,47 @@ class SmallWebRTCConnection(BaseObject): def send_app_message(self, message: Any): """Send an application message through the data channel. + If the data channel is open the message is sent immediately. Otherwise, + the message is placed in an in-memory queue so it can be flushed once the + channel opens, subject to the following constraints: + + * Queueing is only attempted when ``_data_channel_enabled`` is ``True``. It is + set to ``False`` when the data-channel open timeout fires (see + :meth:`_start_data_channel_timeout`), after which messages are silently + discarded. + * The queue will not grow beyond ``MAX_MESSAGE_QUEUE_SIZE`` entries. + Messages that arrive when the queue is full are discarded with a warning. + Args: message: The message to send (will be JSON serialized). """ json_message = json.dumps(message) if self._data_channel and self._data_channel.readyState == "open": self._data_channel.send(json_message) + elif self._data_channel_enabled: + if len(self._outgoing_messages_queue) < MAX_MESSAGE_QUEUE_SIZE: + logger.debug("Data channel not ready, queuing message") + self._outgoing_messages_queue.append(json_message) + else: + logger.warning( + f"Message queue is full ({MAX_MESSAGE_QUEUE_SIZE} messages). Discarding message." + ) else: # The client might choose never to create a data channel. - logger.trace("Data channel not ready, discarding message!") + logger.trace("Data channel unavailable and queueing disabled. Discarding message.") + + def _flush_message_queue(self): + """Flush all queued messages through the now-open data channel. + + Called when the data channel transitions to the *open* state. Cancels + the data-channel open timeout (it is no longer needed) and sends every + message that was buffered while the channel was unavailable. + """ + self._cancel_data_channel_timeout() + logger.debug("Data channel is open, flushing queued messages") + while self._outgoing_messages_queue: + message = self._outgoing_messages_queue.pop(0) + self._data_channel.send(message) def ask_to_renegotiate(self): """Request renegotiation of the WebRTC connection."""