From 4b438ff7d7fba925fae4421270d283492ef87acd Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 4 Apr 2025 16:28:37 -0300 Subject: [PATCH 1/3] Allowing ngrok connections to the video-transform demo --- .../p2p-webrtc/video-transform/client/typescript/vite.config.js | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/p2p-webrtc/video-transform/client/typescript/vite.config.js b/examples/p2p-webrtc/video-transform/client/typescript/vite.config.js index 58f9cfaf9..bd7b7f9d0 100644 --- a/examples/p2p-webrtc/video-transform/client/typescript/vite.config.js +++ b/examples/p2p-webrtc/video-transform/client/typescript/vite.config.js @@ -4,6 +4,7 @@ import react from '@vitejs/plugin-react-swc'; export default defineConfig({ plugins: [react()], server: { + allowedHosts: true, // Allows external connections like ngrok proxy: { // Proxy /api requests to the backend server '/api': { From fe5931b8846cfde1b227ef4ae218dfc15fc3bb18 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 4 Apr 2025 16:28:54 -0300 Subject: [PATCH 2/3] Updating aiortc to fix an issue where 'video/rtx' MIMEType retransmission incorrectly handled as a codec --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f4727286d..97101e3cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ soundfile = [ "soundfile~=0.13.0" ] tavus=[] together = [] ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ] -webrtc = [ "aiortc~=1.10.1", "opencv-python~=4.11.0.86" ] +webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ] websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ] whisper = [ "faster-whisper~=1.1.1" ] From b42dc836969db7f7eddec7cb69c64bd19ceec451 Mon Sep 17 00:00:00 2001 From: Filipi Fuchter Date: Fri, 4 Apr 2025 16:33:57 -0300 Subject: [PATCH 3/3] Improvements for the `SmallWebRTCTransport`: - Wait until the pipeline is ready before triggering the `connected` event. - Queue messages if the data channel is not ready. - Update the aiortc dependency to fix an issue where the 'video/rtx' MIME type was incorrectly handled as a codec retransmission. - Avoid initial video delays. --- CHANGELOG.md | 7 ++++ .../transports/network/webrtc_connection.py | 35 +++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cd3eacea..9532a8e02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 producer. These processors can be useful to push frames from one part of a pipeline to a different one (e.g. when using `ParallelPipeline`). +- Improvements for the `SmallWebRTCTransport`: + - Wait until the pipeline is ready before triggering the `connected` event. + - Queue messages if the data channel is not ready. + - Update the aiortc dependency to fix an issue where the 'video/rtx' MIME type + was incorrectly handled as a codec retransmission. + - Avoid initial video delays. + ### Fixed - Fixed `SmallWebRTCTransport` to support dynamic values for diff --git a/src/pipecat/transports/network/webrtc_connection.py b/src/pipecat/transports/network/webrtc_connection.py index 671ff6622..a2ff6b1bb 100644 --- a/src/pipecat/transports/network/webrtc_connection.py +++ b/src/pipecat/transports/network/webrtc_connection.py @@ -16,6 +16,7 @@ from pipecat.utils.base_object import BaseObject try: from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription + from aiortc.rtcrtpreceiver import RemoteStreamTrack except ModuleNotFoundError as e: logger.error(f"Exception: {e}") logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.") @@ -71,12 +72,21 @@ class SmallWebRTCConnection(BaseObject): self._data_channel = None self._renegotiation_in_progress = False self._last_received_time = None + self._message_queue = [] def _setup_listeners(self): @self._pc.on("datachannel") def on_datachannel(channel): self._data_channel = channel + # Flush queued messages once the data channel is open + @channel.on("open") + async def on_open(): + logger.debug("Data channel is open, flushing queued messages") + while self._message_queue: + message = self._message_queue.pop(0) + self._data_channel.send(message) + @channel.on("message") async def on_message(message): try: @@ -138,6 +148,17 @@ class SmallWebRTCConnection(BaseObject): async def initialize(self, sdp: str, type: str): await self._create_answer(sdp, type) + async def discard_old_frames(self, remote_track: RemoteStreamTrack): + if not hasattr(remote_track, "_queue") or not isinstance( + remote_track._queue, asyncio.Queue + ): + print("Warning: _queue does not exist or has changed in aiortc.") + return + logger.debug("Discarding old frames") + while not remote_track._queue.empty(): + remote_track._queue.get_nowait() # Remove the oldest frame + remote_track._queue.task_done() + async def connect(self): self._connect_invoked = True # If we already connected, trigger again the connected event @@ -145,6 +166,9 @@ class SmallWebRTCConnection(BaseObject): await self._call_event_handler("connected") # We are renegotiating here, because likely we have loose the first video frames # and aiortc does not handle that pretty well. + remove_video_track = self.video_input_track() + if isinstance(remove_video_track, RemoteStreamTrack): + await self.discard_old_frames(remove_video_track) self.ask_to_renegotiate() async def renegotiate(self, sdp: str, type: str, restart_pc: bool = False): @@ -203,6 +227,7 @@ class SmallWebRTCConnection(BaseObject): async def close(self): if self._pc: await self._pc.close() + self._message_queue.clear() def get_answer(self): if not self._answer: @@ -216,6 +241,9 @@ class SmallWebRTCConnection(BaseObject): async def _handle_new_connection_state(self): state = self._pc.connectionState + if state == "connected" and not self._connect_invoked: + # We are going to wait until the pipeline is ready before triggering the event + return logger.debug(f"Connection state changed to: {state}") await self._call_event_handler(state) if state == "failed": @@ -264,9 +292,12 @@ class SmallWebRTCConnection(BaseObject): return self._tracks def send_app_message(self, message: Any): - if self._data_channel: - json_message = json.dumps(message) + json_message = json.dumps(message) + if self._data_channel and self._data_channel.readyState == "open": self._data_channel.send(json_message) + else: + logger.debug("Data channel not ready, queuing message") + self._message_queue.append(json_message) def ask_to_renegotiate(self): if self._renegotiation_in_progress: