Merge pull request #3962 from pipecat-ai/filipi/smallwebrtc_queue

Queuing the messages received before the data channel is ready
This commit is contained in:
Filipi da Silva Fuchter
2026-03-09 10:29:05 -04:00
committed by GitHub
2 changed files with 87 additions and 1 deletions

1
changelog/3962.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `SmallWebRTCConnection` silently discarding messages sent before the data channel is open by queuing them and flushing once the channel is ready. A bounded queue (`MAX_MESSAGE_QUEUE_SIZE = 50`) prevents unbounded memory growth, and a 10-second timeout after connection clears the queue and falls back to discard mode if the data channel never opens.

View File

@@ -41,6 +41,11 @@ AUDIO_TRANSCEIVER_INDEX = 0
VIDEO_TRANSCEIVER_INDEX = 1
SCREEN_VIDEO_TRANSCEIVER_INDEX = 2
# Maximum number of messages to queue while the data channel is not yet open.
MAX_MESSAGE_QUEUE_SIZE = 50
# Seconds to wait for the data channel to open after the peer connection is established.
DATA_CHANNEL_TIMEOUT_SECS = 10
class TrackStatusMessage(BaseModel):
"""Message for updating track enabled/disabled status.
@@ -283,8 +288,11 @@ class SmallWebRTCConnection(BaseObject):
self._data_channel = None
self._renegotiation_in_progress = False
self._last_received_time = None
self._outgoing_messages_queue = []
self._data_channel_enabled = True
self._pending_app_messages = []
self._connecting_timeout_task = None
self._data_channel_timeout_task = None
def _setup_listeners(self):
"""Set up event listeners for the peer connection."""
@@ -297,6 +305,7 @@ class SmallWebRTCConnection(BaseObject):
@channel.on("open")
async def on_open():
logger.debug("Data channel is open!")
self._flush_message_queue()
@channel.on("message")
async def on_message(message):
@@ -499,9 +508,12 @@ class SmallWebRTCConnection(BaseObject):
self._track_map.clear()
if self._pc:
await self._pc.close()
self._outgoing_messages_queue.clear()
self._data_channel_enabled = True
self._pending_app_messages.clear()
self._track_map = {}
self._cancel_monitoring_connecting_state()
self._cancel_data_channel_timeout()
def get_answer(self):
"""Get the SDP answer for the current connection.
@@ -550,6 +562,44 @@ class SmallWebRTCConnection(BaseObject):
self._connecting_timeout_task.cancel()
self._connecting_timeout_task = None
def _start_data_channel_timeout(self) -> None:
"""Start a timeout to detect if the data channel fails to open after connection.
Schedules a background task that fires ``DATA_CHANNEL_TIMEOUT_SECS`` seconds after
the peer connection reaches the *connected* state. If the data channel has not
opened by then, the queued messages are discarded, a warning is logged, and future
calls to :meth:`send_app_message` will silently drop messages instead of queuing
them (fall-back to "discard" mode).
The task is automatically cancelled when the data channel opens successfully (see
:meth:`_flush_message_queue`) or when the connection is closed (see
:meth:`_close`).
"""
async def timeout_handler():
await asyncio.sleep(DATA_CHANNEL_TIMEOUT_SECS)
if not self._data_channel or self._data_channel.readyState != "open":
logger.warning(
f"Data channel not established within {DATA_CHANNEL_TIMEOUT_SECS}s after "
"connection. Clearing message queue and disabling future queueing."
)
self._outgoing_messages_queue.clear()
self._data_channel_enabled = False
self._data_channel_timeout_task = asyncio.create_task(timeout_handler())
def _cancel_data_channel_timeout(self) -> None:
"""Cancel the data-channel open timeout task, if any.
Should be called when the data channel opens successfully (the timeout is no longer
needed) or when the connection is being torn down. If the task is still pending it
will be cancelled and the reference cleared.
"""
if self._data_channel_timeout_task and not self._data_channel_timeout_task.done():
logger.debug("Cancelling the data channel timeout task")
self._data_channel_timeout_task.cancel()
self._data_channel_timeout_task = None
async def _handle_new_connection_state(self):
"""Handle changes in the peer connection state."""
state = self._pc.connectionState
@@ -558,6 +608,9 @@ class SmallWebRTCConnection(BaseObject):
else:
self._cancel_monitoring_connecting_state()
if state == "connected" and not self._data_channel_timeout_task:
self._start_data_channel_timeout()
if state == "connected" and not self._connect_invoked:
# We are going to wait until the pipeline is ready before triggering the event
return
@@ -657,15 +710,47 @@ class SmallWebRTCConnection(BaseObject):
def send_app_message(self, message: Any):
"""Send an application message through the data channel.
If the data channel is open the message is sent immediately. Otherwise,
the message is placed in an in-memory queue so it can be flushed once the
channel opens, subject to the following constraints:
* Queueing is only attempted when ``_data_channel_enabled`` is ``True``. It is
set to ``False`` when the data-channel open timeout fires (see
:meth:`_start_data_channel_timeout`), after which messages are silently
discarded.
* The queue will not grow beyond ``MAX_MESSAGE_QUEUE_SIZE`` entries.
Messages that arrive when the queue is full are discarded with a warning.
Args:
message: The message to send (will be JSON serialized).
"""
json_message = json.dumps(message)
if self._data_channel and self._data_channel.readyState == "open":
self._data_channel.send(json_message)
elif self._data_channel_enabled:
if len(self._outgoing_messages_queue) < MAX_MESSAGE_QUEUE_SIZE:
logger.debug("Data channel not ready, queuing message")
self._outgoing_messages_queue.append(json_message)
else:
logger.warning(
f"Message queue is full ({MAX_MESSAGE_QUEUE_SIZE} messages). Discarding message."
)
else:
# The client might choose never to create a data channel.
logger.trace("Data channel not ready, discarding message!")
logger.trace("Data channel unavailable and queueing disabled. Discarding message.")
def _flush_message_queue(self):
"""Flush all queued messages through the now-open data channel.
Called when the data channel transitions to the *open* state. Cancels
the data-channel open timeout (it is no longer needed) and sends every
message that was buffered while the channel was unavailable.
"""
self._cancel_data_channel_timeout()
logger.debug("Data channel is open, flushing queued messages")
while self._outgoing_messages_queue:
message = self._outgoing_messages_queue.pop(0)
self._data_channel.send(message)
def ask_to_renegotiate(self):
"""Request renegotiation of the WebRTC connection."""