Merge pull request #1468 from pipecat-ai/aleix/smallwebrtc-updates

transports(webrtc): some SmallWebRTC updates
This commit is contained in:
Aleix Conchillo Flaqué
2025-03-28 14:41:45 -07:00
committed by GitHub
8 changed files with 122 additions and 109 deletions

View File

@@ -38,7 +38,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks):
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.on("closed")
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)

View File

@@ -35,7 +35,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks):
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.on("closed")
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)

View File

@@ -82,6 +82,7 @@ soundfile = [ "soundfile~=0.13.0" ]
tavus=[]
together = []
ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ]
webrtc = [ "aiortc~=1.10.1", "opencv-python~=4.11.0.86" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]

View File

@@ -6,22 +6,14 @@
import asyncio
import fractions
import logging
import time
from collections import deque
from typing import Any, Awaitable, Callable, Optional
import cv2
import numpy as np
from aiortc import VideoStreamTrack
from aiortc.mediastreams import AudioStreamTrack, MediaStreamError, VideoFrame
from av import AudioFrame, AudioResampler
from loguru import logger
from pydantic import BaseModel
# Get the logger for aiortc
# aiortc_logger = logging.getLogger("aiortc")
# aiortc_logger.setLevel(logging.DEBUG)
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -37,6 +29,16 @@ from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
try:
import cv2
from aiortc import VideoStreamTrack
from aiortc.mediastreams import AudioStreamTrack, MediaStreamError
from av import AudioFrame, AudioResampler, VideoFrame
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
raise Exception(f"Missing module: {e}")
class SmallWebRTCCallbacks(BaseModel):
on_app_message: Callable[[Any], Awaitable[None]]
@@ -137,7 +139,7 @@ class RawVideoTrack(VideoStreamTrack):
class SmallWebRTCClient:
def __init__(self, webrtc_connection: SmallWebRTCConnection, callbacks: SmallWebRTCCallbacks):
self._webrtcConnection = webrtc_connection
self._webrtc_connection = webrtc_connection
self._closing = False
self._callbacks = callbacks
@@ -155,23 +157,23 @@ class SmallWebRTCClient:
# otherwise we face issues with Silero VAD
self._pipecat_resampler = AudioResampler("s16", "mono", 16000)
@self._webrtcConnection.on("connected")
@self._webrtc_connection.event_handler("connected")
async def on_connected(connection: SmallWebRTCConnection):
logger.info("Peer connection established.")
logger.debug("Peer connection established.")
await self._handle_client_connected()
@self._webrtcConnection.on("disconnected")
@self._webrtc_connection.event_handler("disconnected")
async def on_disconnected(connection: SmallWebRTCConnection):
logger.info("Peer connection lost.")
logger.debug("Peer connection lost.")
await self._handle_client_disconnected()
@self._webrtcConnection.on("closed")
@self._webrtc_connection.event_handler("closed")
async def on_closed(connection: SmallWebRTCConnection):
logger.info("Client connection closed.")
logger.debug("Client connection closed.")
await self._handle_client_closed()
@self._webrtcConnection.on("appMessage")
async def on_app_message(message: Any):
@self._webrtc_connection.event_handler("app-message")
async def on_app_message(connection: SmallWebRTCConnection, message: Any):
await self._handle_app_message(message)
async def read_video_frame(self):
@@ -187,9 +189,9 @@ class SmallWebRTCClient:
try:
frame = await asyncio.wait_for(self._video_input_track.recv(), timeout=2.0)
except asyncio.TimeoutError:
if self._webrtcConnection.is_connected():
if self._webrtc_connection.is_connected():
logger.warning("Timeout: No video frame received within the specified time.")
# self._webrtcConnection.ask_to_renegotiate()
# self._webrtc_connection.ask_to_renegotiate()
frame = None
except MediaStreamError:
logger.warning("Received an unexpected media stream error while reading the audio.")
@@ -237,7 +239,7 @@ class SmallWebRTCClient:
try:
frame = await asyncio.wait_for(self._audio_input_track.recv(), timeout=2.0)
except asyncio.TimeoutError:
if self._webrtcConnection.is_connected():
if self._webrtc_connection.is_connected():
logger.warning("Timeout: No audio frame received within the specified time.")
frame = None
except MediaStreamError:
@@ -285,56 +287,56 @@ class SmallWebRTCClient:
self._params = _params
async def connect(self):
if self._webrtcConnection.is_connected():
if self._webrtc_connection.is_connected():
# already initialized
return
logger.info(f"Connecting to Small WebRTC")
await self._webrtcConnection.connect()
await self._webrtc_connection.connect()
async def disconnect(self):
if self.is_connected and not self.is_closing:
logger.info(f"Disconnecting to Small WebRTC")
self._closing = True
await self._webrtcConnection.close()
await self._webrtc_connection.close()
await self._handle_client_disconnected()
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if self._can_send():
self._webrtcConnection.send_app_message(frame.message)
self._webrtc_connection.send_app_message(frame.message)
async def _handle_client_connected(self):
# There is nothing to do here yet, the pipeline is still not ready
if not self._params:
return
self._audio_input_track = self._webrtcConnection.audio_input_track()
self._video_input_track = self._webrtcConnection.video_input_track()
self._audio_input_track = self._webrtc_connection.audio_input_track()
self._video_input_track = self._webrtc_connection.video_input_track()
if self._params.audio_out_enabled:
self._audio_output_track = RawAudioTrack(sample_rate=self._out_sample_rate)
self._webrtcConnection.replace_audio_track(self._audio_output_track)
self._webrtc_connection.replace_audio_track(self._audio_output_track)
if self._params.camera_out_enabled:
self._video_output_track = RawVideoTrack(
width=self._params.camera_out_width, height=self._params.camera_out_height
)
self._webrtcConnection.replace_video_track(self._video_output_track)
self._webrtc_connection.replace_video_track(self._video_output_track)
await self._callbacks.on_client_connected(self._webrtcConnection)
await self._callbacks.on_client_connected(self._webrtc_connection)
async def _handle_client_disconnected(self):
self._audio_input_track = None
self._video_input_track = None
self._audio_output_track = None
self._video_output_track = None
await self._callbacks.on_client_disconnected(self._webrtcConnection)
await self._callbacks.on_client_disconnected(self._webrtc_connection)
async def _handle_client_closed(self):
self._audio_input_track = None
self._video_input_track = None
self._audio_output_track = None
self._video_output_track = None
await self._callbacks.on_client_closed(self._webrtcConnection)
await self._callbacks.on_client_closed(self._webrtc_connection)
async def _handle_app_message(self, message: Any):
await self._callbacks.on_app_message(message)
@@ -344,7 +346,7 @@ class SmallWebRTCClient:
@property
def is_connected(self) -> bool:
return self._webrtcConnection.is_connected()
return self._webrtc_connection.is_connected()
@property
def is_closing(self) -> bool:
@@ -412,7 +414,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
async def push_app_message(self, message: Any):
logger.info(f"Received app message inside SmallWebRTCInputTransport {message}")
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
frame = TransportMessageUrgentFrame(message=message)
await self.push_frame(frame)

View File

@@ -1,14 +1,25 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import json
import time
import uuid
from enum import Enum
from typing import Any, Optional
from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription
from loguru import logger
from pipecat.utils.event_emitter import EventEmitter
from pipecat.utils.base_object import BaseObject
try:
from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.")
raise Exception(f"Missing module: {e}")
SIGNALLING_TYPE = "signalling"
@@ -17,7 +28,7 @@ class SignallingMessage(Enum):
RENEGOTIATE = "renegotiate"
class SmallWebRTCConnection(EventEmitter):
class SmallWebRTCConnection(BaseObject):
def __init__(self, ice_servers=None):
super().__init__()
if ice_servers:
@@ -27,13 +38,34 @@ class SmallWebRTCConnection(EventEmitter):
self._connect_invoked = False
self._initialize()
# Register supported handlers. The user will only be able to register
# these handlers.
self._register_event_handler("app-message")
self._register_event_handler("track-started")
self._register_event_handler("track-ended")
# connection states
self._register_event_handler("connecting")
self._register_event_handler("connected")
self._register_event_handler("disconnected")
self._register_event_handler("closed")
self._register_event_handler("failed")
self._register_event_handler("new")
@property
def pc(self) -> RTCPeerConnection:
return self._pc
@property
def pc_id(self) -> str:
return self._pc_id
def _initialize(self):
logger.info("Initializing new peer connection")
logger.debug("Initializing new peer connection")
rtc_config = RTCConfiguration(iceServers=self.ice_servers)
self.answer: Optional[RTCSessionDescription] = None
self.pc = RTCPeerConnection(rtc_config)
self.pc_id = "PeerConnection(%s)" % uuid.uuid4()
self._answer: Optional[RTCSessionDescription] = None
self._pc = RTCPeerConnection(rtc_config)
self._pc_id = self.name
self._setup_listeners()
self._tracks = set()
self._data_channel = None
@@ -41,7 +73,7 @@ class SmallWebRTCConnection(EventEmitter):
self._last_received_time = None
def _setup_listeners(self):
@self.pc.on("datachannel")
@self._pc.on("datachannel")
def on_datachannel(channel):
self._data_channel = channel
@@ -54,54 +86,54 @@ class SmallWebRTCConnection(EventEmitter):
self._last_received_time = time.time()
else:
json_message = json.loads(message)
await self.emit("appMessage", json_message)
await self._call_event_handler("app-message", json_message)
except Exception as e:
logger.exception(f"Error parsing JSON message {message}, {e}")
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
# So, in case we loose connection, this event will not be triggered
@self.pc.on("connectionstatechange")
@self._pc.on("connectionstatechange")
async def on_connectionstatechange():
await self._handle_new_connection_state()
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
# So, in case we loose connection, this event will not be triggered
@self.pc.on("iceconnectionstatechange")
@self._pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
logger.info(
f"Ice connection state is {self.pc.iceConnectionState}, connection is {self.pc.connectionState}"
logger.debug(
f"ICE connection state is {self._pc.iceConnectionState}, connection is {self._pc.connectionState}"
)
@self.pc.on("icegatheringstatechange")
@self._pc.on("icegatheringstatechange")
async def on_icegatheringstatechange():
logger.info(f"Ice gathering state is {self.pc.iceGatheringState}")
logger.debug(f"ICE gathering state is {self._pc.iceGatheringState}")
@self.pc.on("track")
@self._pc.on("track")
async def on_track(track):
logger.info(f"Track {track.kind} received")
logger.debug(f"Track {track.kind} received")
self._tracks.add(track)
await self.emit("track-started", track)
await self._call_event_handler("track-started", track)
@track.on("ended")
async def on_ended():
logger.info(f"Track {track.kind} ended")
logger.debug(f"Track {track.kind} ended")
self._tracks.discard(track)
await self.emit("track-ended", track)
await self._call_event_handler("track-ended", track)
async def _create_answer(self, sdp: str, type: str):
offer = RTCSessionDescription(sdp=sdp, type=type)
await self.pc.setRemoteDescription(offer)
await self._pc.setRemoteDescription(offer)
# For some reason, aiortc is not respecting the SDP for the transceivers to be sendrcv
# so we are basically forcing it to act this way
self.force_transceivers_to_send_recv()
# this answer does not contain the ice candidates, which will be gathered later, after the setLocalDescription
logger.info(f"Creating answer")
local_answer = await self.pc.createAnswer()
await self.pc.setLocalDescription(local_answer)
logger.info(f"Setting the answer after the local description is created")
self.answer = self.pc.localDescription
logger.debug(f"Creating answer")
local_answer = await self._pc.createAnswer()
await self._pc.setLocalDescription(local_answer)
logger.debug(f"Setting the answer after the local description is created")
self._answer = self._pc.localDescription
async def initialize(self, sdp: str, type: str):
await self._create_answer(sdp, type)
@@ -110,19 +142,19 @@ class SmallWebRTCConnection(EventEmitter):
self._connect_invoked = True
# If we already connected, trigger again the connected event
if self.is_connected():
await self.emit("connected", self)
await self._call_event_handler("connected")
# We are renegotiating here, because likely we have loose the first video frames
# and aiortc does not handle that pretty well.
self.ask_to_renegotiate()
async def renegotiate(self, sdp: str, type: str, restart_pc: bool = False):
logger.info(f"Renegotiating {self.pc_id}")
logger.debug(f"Renegotiating {self._pc_id}")
if restart_pc:
await self.emit("disconnected", self)
logger.info("Closing old peer connection")
await self._call_event_handler("disconnected")
logger.debug("Closing old peer connection")
# removing the listeners to prevent the bot from closing
self.pc.remove_all_listeners()
self._pc.remove_all_listeners()
await self.close()
# we are initializing a new peer connection in this case.
self._initialize()
@@ -139,59 +171,59 @@ class SmallWebRTCConnection(EventEmitter):
asyncio.create_task(delayed_task())
def force_transceivers_to_send_recv(self):
for transceiver in self.pc.getTransceivers():
for transceiver in self._pc.getTransceivers():
transceiver.direction = "sendrecv"
# logger.info(
# logger.debug(
# f"Transceiver: {transceiver}, Mid: {transceiver.mid}, Direction: {transceiver.direction}"
# )
# logger.info(f"Sender track: {transceiver.sender.track}")
# logger.debug(f"Sender track: {transceiver.sender.track}")
def replace_audio_track(self, track):
logger.info(f"Replacing audio track {track.kind}")
logger.debug(f"Replacing audio track {track.kind}")
# Transceivers always appear in creation-order for both peers
# For now we are only considering that we are going to have 02 transceivers,
# one for audio and one for video
transceivers = self.pc.getTransceivers()
transceivers = self._pc.getTransceivers()
if len(transceivers) > 0 and transceivers[0].sender:
transceivers[0].sender.replaceTrack(track)
else:
logger.warning("Audio transceiver not found. Cannot replace audio track.")
def replace_video_track(self, track):
logger.info(f"Replacing video track {track.kind}")
logger.debug(f"Replacing video track {track.kind}")
# Transceivers always appear in creation-order for both peers
# For now we are only considering that we are going to have 02 transceivers,
# one for audio and one for video
transceivers = self.pc.getTransceivers()
transceivers = self._pc.getTransceivers()
if len(transceivers) > 1 and transceivers[1].sender:
transceivers[1].sender.replaceTrack(track)
else:
logger.warning("Video transceiver not found. Cannot replace video track.")
async def close(self):
if self.pc:
await self.pc.close()
if self._pc:
await self._pc.close()
def get_answer(self):
if not self.answer:
if not self._answer:
return None
return {
"sdp": self.answer.sdp,
"type": self.answer.type,
"pc_id": self.pc_id,
"sdp": self._answer.sdp,
"type": self._answer.type,
"pc_id": self._pc_id,
}
async def _handle_new_connection_state(self):
state = self.pc.connectionState
logger.info(f"Connection state changed to: {state}")
await self.emit(state, self)
state = self._pc.connectionState
logger.debug(f"Connection state changed to: {state}")
await self._call_event_handler(state)
if state == "failed":
logger.warning("Connection failed, closing peer connection.")
await self.close()
# Despite the fact that aiortc provides this listener, they don't have a status for "disconnected"
# So, there is no advantage in looking at self.pc.connectionState
# So, there is no advantage in looking at self._pc.connectionState
# That is why we are trying to keep our own state
def is_connected(self):
# If the small webrtc transport has never invoked to connect
@@ -202,7 +234,7 @@ class SmallWebRTCConnection(EventEmitter):
if self._last_received_time is None:
# if we have never received a message, it is probably because the client has not created a data channel
# so we are going to trust aiortc in this case
return self.pc.connectionState == "connected"
return self._pc.connectionState == "connected"
# Checks if the last received ping was within the last 3 seconds.
return (time.time() - self._last_received_time) < 3
@@ -210,7 +242,7 @@ class SmallWebRTCConnection(EventEmitter):
# Transceivers always appear in creation-order for both peers
# For now we are only considering that we are going to have 02 transceivers,
# one for audio and one for video
transceivers = self.pc.getTransceivers()
transceivers = self._pc.getTransceivers()
if len(transceivers) == 0 or not transceivers[0].receiver:
logger.warning("No audio transceiver is available")
return None
@@ -221,7 +253,7 @@ class SmallWebRTCConnection(EventEmitter):
# Transceivers always appear in creation-order for both peers
# For now we are only considering that we are going to have 02 transceivers,
# one for audio and one for video
transceivers = self.pc.getTransceivers()
transceivers = self._pc.getTransceivers()
if len(transceivers) <= 1 or not transceivers[1].receiver:
logger.warning("No video transceiver is available")
return None

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import time

View File

@@ -6,7 +6,6 @@
import asyncio
import io
import json
import time
import wave
from typing import Awaitable, Callable, Optional

View File

@@ -1,20 +0,0 @@
class EventEmitter:
def __init__(self):
self._events = {}
def on(self, event_name):
"""Decorator to register an event handler."""
def decorator(func):
if event_name not in self._events:
self._events[event_name] = []
self._events[event_name].append(func)
return func
return decorator
async def emit(self, event_name, *args, **kwargs):
"""Trigger all handlers for a given event."""
if event_name in self._events:
for handler in self._events[event_name]:
await handler(*args, **kwargs)