Merge pull request #1498 from pipecat-ai/aiortc_example_ios

Improvements for the SmallWebRTCTransport
This commit is contained in:
Filipi da Silva Fuchter
2025-04-04 16:39:06 -03:00
committed by GitHub
4 changed files with 42 additions and 3 deletions

View File

@@ -16,6 +16,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
producer. These processors can be useful to push frames from one part of a
pipeline to a different one (e.g. when using `ParallelPipeline`).
- Improvements for the `SmallWebRTCTransport`:
- Wait until the pipeline is ready before triggering the `connected` event.
- Queue messages if the data channel is not ready.
- Update the aiortc dependency to fix an issue where the 'video/rtx' MIME type
was incorrectly handled as a codec retransmission.
- Avoid initial video delays.
### Fixed
- Fixed `SmallWebRTCTransport` to support dynamic values for

View File

@@ -4,6 +4,7 @@ import react from '@vitejs/plugin-react-swc';
export default defineConfig({
plugins: [react()],
server: {
allowedHosts: true, // Allows external connections like ngrok
proxy: {
// Proxy /api requests to the backend server
'/api': {

View File

@@ -85,7 +85,7 @@ soundfile = [ "soundfile~=0.13.0" ]
tavus=[]
together = []
ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ]
webrtc = [ "aiortc~=1.10.1", "opencv-python~=4.11.0.86" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]

View File

@@ -16,6 +16,7 @@ from pipecat.utils.base_object import BaseObject
try:
from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription
from aiortc.rtcrtpreceiver import RemoteStreamTrack
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
@@ -71,12 +72,21 @@ class SmallWebRTCConnection(BaseObject):
self._data_channel = None
self._renegotiation_in_progress = False
self._last_received_time = None
self._message_queue = []
def _setup_listeners(self):
@self._pc.on("datachannel")
def on_datachannel(channel):
self._data_channel = channel
# Flush queued messages once the data channel is open
@channel.on("open")
async def on_open():
logger.debug("Data channel is open, flushing queued messages")
while self._message_queue:
message = self._message_queue.pop(0)
self._data_channel.send(message)
@channel.on("message")
async def on_message(message):
try:
@@ -138,6 +148,17 @@ class SmallWebRTCConnection(BaseObject):
async def initialize(self, sdp: str, type: str):
await self._create_answer(sdp, type)
async def discard_old_frames(self, remote_track: RemoteStreamTrack):
if not hasattr(remote_track, "_queue") or not isinstance(
remote_track._queue, asyncio.Queue
):
print("Warning: _queue does not exist or has changed in aiortc.")
return
logger.debug("Discarding old frames")
while not remote_track._queue.empty():
remote_track._queue.get_nowait() # Remove the oldest frame
remote_track._queue.task_done()
async def connect(self):
self._connect_invoked = True
# If we already connected, trigger again the connected event
@@ -145,6 +166,9 @@ class SmallWebRTCConnection(BaseObject):
await self._call_event_handler("connected")
# We are renegotiating here, because likely we have loose the first video frames
# and aiortc does not handle that pretty well.
remove_video_track = self.video_input_track()
if isinstance(remove_video_track, RemoteStreamTrack):
await self.discard_old_frames(remove_video_track)
self.ask_to_renegotiate()
async def renegotiate(self, sdp: str, type: str, restart_pc: bool = False):
@@ -203,6 +227,7 @@ class SmallWebRTCConnection(BaseObject):
async def close(self):
if self._pc:
await self._pc.close()
self._message_queue.clear()
def get_answer(self):
if not self._answer:
@@ -216,6 +241,9 @@ class SmallWebRTCConnection(BaseObject):
async def _handle_new_connection_state(self):
state = self._pc.connectionState
if state == "connected" and not self._connect_invoked:
# We are going to wait until the pipeline is ready before triggering the event
return
logger.debug(f"Connection state changed to: {state}")
await self._call_event_handler(state)
if state == "failed":
@@ -264,9 +292,12 @@ class SmallWebRTCConnection(BaseObject):
return self._tracks
def send_app_message(self, message: Any):
if self._data_channel:
json_message = json.dumps(message)
json_message = json.dumps(message)
if self._data_channel and self._data_channel.readyState == "open":
self._data_channel.send(json_message)
else:
logger.debug("Data channel not ready, queuing message")
self._message_queue.append(json_message)
def ask_to_renegotiate(self):
if self._renegotiation_in_progress: