diff --git a/examples/p2p-webrtc/video-transform/server/server.py b/examples/p2p-webrtc/video-transform/server/server.py index 59f182f62..79ee75ea3 100644 --- a/examples/p2p-webrtc/video-transform/server/server.py +++ b/examples/p2p-webrtc/video-transform/server/server.py @@ -38,7 +38,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks): pipecat_connection = SmallWebRTCConnection(ice_servers) await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"]) - @pipecat_connection.on("closed") + @pipecat_connection.event_handler("closed") async def handle_disconnected(webrtc_connection: SmallWebRTCConnection): logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}") pcs_map.pop(webrtc_connection.pc_id, None) diff --git a/examples/p2p-webrtc/voice-agent/server.py b/examples/p2p-webrtc/voice-agent/server.py index 3e5f8dcfb..e91340a32 100644 --- a/examples/p2p-webrtc/voice-agent/server.py +++ b/examples/p2p-webrtc/voice-agent/server.py @@ -35,7 +35,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks): pipecat_connection = SmallWebRTCConnection() await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"]) - @pipecat_connection.on("closed") + @pipecat_connection.event_handler("closed") async def handle_disconnected(webrtc_connection: SmallWebRTCConnection): logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}") pcs_map.pop(webrtc_connection.pc_id, None) diff --git a/pyproject.toml b/pyproject.toml index 0f2e993ae..dddbf3678 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,7 @@ soundfile = [ "soundfile~=0.13.0" ] tavus=[] together = [] ultravox = [ "transformers~=4.48.0", "vllm~=0.7.3" ] +webrtc = [ "aiortc~=1.10.1", "opencv-python~=4.11.0.86" ] websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ] whisper = [ "faster-whisper~=1.1.1" ] diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index bb3044a75..987c412fd 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -6,22 +6,14 @@ import asyncio import fractions -import logging import time from collections import deque from typing import Any, Awaitable, Callable, Optional -import cv2 import numpy as np -from aiortc import VideoStreamTrack -from aiortc.mediastreams import AudioStreamTrack, MediaStreamError, VideoFrame -from av import AudioFrame, AudioResampler from loguru import logger from pydantic import BaseModel -# Get the logger for aiortc -# aiortc_logger = logging.getLogger("aiortc") -# aiortc_logger.setLevel(logging.DEBUG) from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -37,6 +29,16 @@ from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +try: + import cv2 + from aiortc import VideoStreamTrack + from aiortc.mediastreams import AudioStreamTrack, MediaStreamError + from av import AudioFrame, AudioResampler, VideoFrame +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.") + raise Exception(f"Missing module: {e}") + class SmallWebRTCCallbacks(BaseModel): on_app_message: Callable[[Any], Awaitable[None]] @@ -137,7 +139,7 @@ class RawVideoTrack(VideoStreamTrack): class SmallWebRTCClient: def __init__(self, webrtc_connection: SmallWebRTCConnection, callbacks: SmallWebRTCCallbacks): - self._webrtcConnection = webrtc_connection + self._webrtc_connection = webrtc_connection self._closing = False self._callbacks = callbacks @@ -155,23 +157,23 @@ class SmallWebRTCClient: # otherwise we face issues with Silero VAD self._pipecat_resampler = AudioResampler("s16", "mono", 16000) - @self._webrtcConnection.on("connected") + @self._webrtc_connection.event_handler("connected") async def on_connected(connection: SmallWebRTCConnection): - logger.info("Peer connection established.") + logger.debug("Peer connection established.") await self._handle_client_connected() - @self._webrtcConnection.on("disconnected") + @self._webrtc_connection.event_handler("disconnected") async def on_disconnected(connection: SmallWebRTCConnection): - logger.info("Peer connection lost.") + logger.debug("Peer connection lost.") await self._handle_client_disconnected() - @self._webrtcConnection.on("closed") + @self._webrtc_connection.event_handler("closed") async def on_closed(connection: SmallWebRTCConnection): - logger.info("Client connection closed.") + logger.debug("Client connection closed.") await self._handle_client_closed() - @self._webrtcConnection.on("appMessage") - async def on_app_message(message: Any): + @self._webrtc_connection.event_handler("app-message") + async def on_app_message(connection: SmallWebRTCConnection, message: Any): await self._handle_app_message(message) async def read_video_frame(self): @@ -187,9 +189,9 @@ class SmallWebRTCClient: try: frame = await asyncio.wait_for(self._video_input_track.recv(), timeout=2.0) except asyncio.TimeoutError: - if self._webrtcConnection.is_connected(): + if self._webrtc_connection.is_connected(): logger.warning("Timeout: No video frame received within the specified time.") - # self._webrtcConnection.ask_to_renegotiate() + # self._webrtc_connection.ask_to_renegotiate() frame = None except MediaStreamError: logger.warning("Received an unexpected media stream error while reading the audio.") @@ -237,7 +239,7 @@ class SmallWebRTCClient: try: frame = await asyncio.wait_for(self._audio_input_track.recv(), timeout=2.0) except asyncio.TimeoutError: - if self._webrtcConnection.is_connected(): + if self._webrtc_connection.is_connected(): logger.warning("Timeout: No audio frame received within the specified time.") frame = None except MediaStreamError: @@ -285,56 +287,56 @@ class SmallWebRTCClient: self._params = _params async def connect(self): - if self._webrtcConnection.is_connected(): + if self._webrtc_connection.is_connected(): # already initialized return logger.info(f"Connecting to Small WebRTC") - await self._webrtcConnection.connect() + await self._webrtc_connection.connect() async def disconnect(self): if self.is_connected and not self.is_closing: logger.info(f"Disconnecting to Small WebRTC") self._closing = True - await self._webrtcConnection.close() + await self._webrtc_connection.close() await self._handle_client_disconnected() async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): if self._can_send(): - self._webrtcConnection.send_app_message(frame.message) + self._webrtc_connection.send_app_message(frame.message) async def _handle_client_connected(self): # There is nothing to do here yet, the pipeline is still not ready if not self._params: return - self._audio_input_track = self._webrtcConnection.audio_input_track() - self._video_input_track = self._webrtcConnection.video_input_track() + self._audio_input_track = self._webrtc_connection.audio_input_track() + self._video_input_track = self._webrtc_connection.video_input_track() if self._params.audio_out_enabled: self._audio_output_track = RawAudioTrack(sample_rate=self._out_sample_rate) - self._webrtcConnection.replace_audio_track(self._audio_output_track) + self._webrtc_connection.replace_audio_track(self._audio_output_track) if self._params.camera_out_enabled: self._video_output_track = RawVideoTrack( width=self._params.camera_out_width, height=self._params.camera_out_height ) - self._webrtcConnection.replace_video_track(self._video_output_track) + self._webrtc_connection.replace_video_track(self._video_output_track) - await self._callbacks.on_client_connected(self._webrtcConnection) + await self._callbacks.on_client_connected(self._webrtc_connection) async def _handle_client_disconnected(self): self._audio_input_track = None self._video_input_track = None self._audio_output_track = None self._video_output_track = None - await self._callbacks.on_client_disconnected(self._webrtcConnection) + await self._callbacks.on_client_disconnected(self._webrtc_connection) async def _handle_client_closed(self): self._audio_input_track = None self._video_input_track = None self._audio_output_track = None self._video_output_track = None - await self._callbacks.on_client_closed(self._webrtcConnection) + await self._callbacks.on_client_closed(self._webrtc_connection) async def _handle_app_message(self, message: Any): await self._callbacks.on_app_message(message) @@ -344,7 +346,7 @@ class SmallWebRTCClient: @property def is_connected(self) -> bool: - return self._webrtcConnection.is_connected() + return self._webrtc_connection.is_connected() @property def is_closing(self) -> bool: @@ -412,7 +414,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})") async def push_app_message(self, message: Any): - logger.info(f"Received app message inside SmallWebRTCInputTransport {message}") + logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}") frame = TransportMessageUrgentFrame(message=message) await self.push_frame(frame) diff --git a/src/pipecat/transports/network/webrtc_connection.py b/src/pipecat/transports/network/webrtc_connection.py index e18879add..671ff6622 100644 --- a/src/pipecat/transports/network/webrtc_connection.py +++ b/src/pipecat/transports/network/webrtc_connection.py @@ -1,14 +1,25 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import asyncio import json import time -import uuid from enum import Enum from typing import Any, Optional -from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription from loguru import logger -from pipecat.utils.event_emitter import EventEmitter +from pipecat.utils.base_object import BaseObject + +try: + from aiortc import RTCConfiguration, RTCIceServer, RTCPeerConnection, RTCSessionDescription +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use the SmallWebRTC, you need to `pip install pipecat-ai[webrtc]`.") + raise Exception(f"Missing module: {e}") SIGNALLING_TYPE = "signalling" @@ -17,7 +28,7 @@ class SignallingMessage(Enum): RENEGOTIATE = "renegotiate" -class SmallWebRTCConnection(EventEmitter): +class SmallWebRTCConnection(BaseObject): def __init__(self, ice_servers=None): super().__init__() if ice_servers: @@ -27,13 +38,34 @@ class SmallWebRTCConnection(EventEmitter): self._connect_invoked = False self._initialize() + # Register supported handlers. The user will only be able to register + # these handlers. + self._register_event_handler("app-message") + self._register_event_handler("track-started") + self._register_event_handler("track-ended") + # connection states + self._register_event_handler("connecting") + self._register_event_handler("connected") + self._register_event_handler("disconnected") + self._register_event_handler("closed") + self._register_event_handler("failed") + self._register_event_handler("new") + + @property + def pc(self) -> RTCPeerConnection: + return self._pc + + @property + def pc_id(self) -> str: + return self._pc_id + def _initialize(self): - logger.info("Initializing new peer connection") + logger.debug("Initializing new peer connection") rtc_config = RTCConfiguration(iceServers=self.ice_servers) - self.answer: Optional[RTCSessionDescription] = None - self.pc = RTCPeerConnection(rtc_config) - self.pc_id = "PeerConnection(%s)" % uuid.uuid4() + self._answer: Optional[RTCSessionDescription] = None + self._pc = RTCPeerConnection(rtc_config) + self._pc_id = self.name self._setup_listeners() self._tracks = set() self._data_channel = None @@ -41,7 +73,7 @@ class SmallWebRTCConnection(EventEmitter): self._last_received_time = None def _setup_listeners(self): - @self.pc.on("datachannel") + @self._pc.on("datachannel") def on_datachannel(channel): self._data_channel = channel @@ -54,54 +86,54 @@ class SmallWebRTCConnection(EventEmitter): self._last_received_time = time.time() else: json_message = json.loads(message) - await self.emit("appMessage", json_message) + await self._call_event_handler("app-message", json_message) except Exception as e: logger.exception(f"Error parsing JSON message {message}, {e}") # Despite the fact that aiortc provides this listener, they don't have a status for "disconnected" # So, in case we loose connection, this event will not be triggered - @self.pc.on("connectionstatechange") + @self._pc.on("connectionstatechange") async def on_connectionstatechange(): await self._handle_new_connection_state() # Despite the fact that aiortc provides this listener, they don't have a status for "disconnected" # So, in case we loose connection, this event will not be triggered - @self.pc.on("iceconnectionstatechange") + @self._pc.on("iceconnectionstatechange") async def on_iceconnectionstatechange(): - logger.info( - f"Ice connection state is {self.pc.iceConnectionState}, connection is {self.pc.connectionState}" + logger.debug( + f"ICE connection state is {self._pc.iceConnectionState}, connection is {self._pc.connectionState}" ) - @self.pc.on("icegatheringstatechange") + @self._pc.on("icegatheringstatechange") async def on_icegatheringstatechange(): - logger.info(f"Ice gathering state is {self.pc.iceGatheringState}") + logger.debug(f"ICE gathering state is {self._pc.iceGatheringState}") - @self.pc.on("track") + @self._pc.on("track") async def on_track(track): - logger.info(f"Track {track.kind} received") + logger.debug(f"Track {track.kind} received") self._tracks.add(track) - await self.emit("track-started", track) + await self._call_event_handler("track-started", track) @track.on("ended") async def on_ended(): - logger.info(f"Track {track.kind} ended") + logger.debug(f"Track {track.kind} ended") self._tracks.discard(track) - await self.emit("track-ended", track) + await self._call_event_handler("track-ended", track) async def _create_answer(self, sdp: str, type: str): offer = RTCSessionDescription(sdp=sdp, type=type) - await self.pc.setRemoteDescription(offer) + await self._pc.setRemoteDescription(offer) # For some reason, aiortc is not respecting the SDP for the transceivers to be sendrcv # so we are basically forcing it to act this way self.force_transceivers_to_send_recv() # this answer does not contain the ice candidates, which will be gathered later, after the setLocalDescription - logger.info(f"Creating answer") - local_answer = await self.pc.createAnswer() - await self.pc.setLocalDescription(local_answer) - logger.info(f"Setting the answer after the local description is created") - self.answer = self.pc.localDescription + logger.debug(f"Creating answer") + local_answer = await self._pc.createAnswer() + await self._pc.setLocalDescription(local_answer) + logger.debug(f"Setting the answer after the local description is created") + self._answer = self._pc.localDescription async def initialize(self, sdp: str, type: str): await self._create_answer(sdp, type) @@ -110,19 +142,19 @@ class SmallWebRTCConnection(EventEmitter): self._connect_invoked = True # If we already connected, trigger again the connected event if self.is_connected(): - await self.emit("connected", self) + await self._call_event_handler("connected") # We are renegotiating here, because likely we have loose the first video frames # and aiortc does not handle that pretty well. self.ask_to_renegotiate() async def renegotiate(self, sdp: str, type: str, restart_pc: bool = False): - logger.info(f"Renegotiating {self.pc_id}") + logger.debug(f"Renegotiating {self._pc_id}") if restart_pc: - await self.emit("disconnected", self) - logger.info("Closing old peer connection") + await self._call_event_handler("disconnected") + logger.debug("Closing old peer connection") # removing the listeners to prevent the bot from closing - self.pc.remove_all_listeners() + self._pc.remove_all_listeners() await self.close() # we are initializing a new peer connection in this case. self._initialize() @@ -139,59 +171,59 @@ class SmallWebRTCConnection(EventEmitter): asyncio.create_task(delayed_task()) def force_transceivers_to_send_recv(self): - for transceiver in self.pc.getTransceivers(): + for transceiver in self._pc.getTransceivers(): transceiver.direction = "sendrecv" - # logger.info( + # logger.debug( # f"Transceiver: {transceiver}, Mid: {transceiver.mid}, Direction: {transceiver.direction}" # ) - # logger.info(f"Sender track: {transceiver.sender.track}") + # logger.debug(f"Sender track: {transceiver.sender.track}") def replace_audio_track(self, track): - logger.info(f"Replacing audio track {track.kind}") + logger.debug(f"Replacing audio track {track.kind}") # Transceivers always appear in creation-order for both peers # For now we are only considering that we are going to have 02 transceivers, # one for audio and one for video - transceivers = self.pc.getTransceivers() + transceivers = self._pc.getTransceivers() if len(transceivers) > 0 and transceivers[0].sender: transceivers[0].sender.replaceTrack(track) else: logger.warning("Audio transceiver not found. Cannot replace audio track.") def replace_video_track(self, track): - logger.info(f"Replacing video track {track.kind}") + logger.debug(f"Replacing video track {track.kind}") # Transceivers always appear in creation-order for both peers # For now we are only considering that we are going to have 02 transceivers, # one for audio and one for video - transceivers = self.pc.getTransceivers() + transceivers = self._pc.getTransceivers() if len(transceivers) > 1 and transceivers[1].sender: transceivers[1].sender.replaceTrack(track) else: logger.warning("Video transceiver not found. Cannot replace video track.") async def close(self): - if self.pc: - await self.pc.close() + if self._pc: + await self._pc.close() def get_answer(self): - if not self.answer: + if not self._answer: return None return { - "sdp": self.answer.sdp, - "type": self.answer.type, - "pc_id": self.pc_id, + "sdp": self._answer.sdp, + "type": self._answer.type, + "pc_id": self._pc_id, } async def _handle_new_connection_state(self): - state = self.pc.connectionState - logger.info(f"Connection state changed to: {state}") - await self.emit(state, self) + state = self._pc.connectionState + logger.debug(f"Connection state changed to: {state}") + await self._call_event_handler(state) if state == "failed": logger.warning("Connection failed, closing peer connection.") await self.close() # Despite the fact that aiortc provides this listener, they don't have a status for "disconnected" - # So, there is no advantage in looking at self.pc.connectionState + # So, there is no advantage in looking at self._pc.connectionState # That is why we are trying to keep our own state def is_connected(self): # If the small webrtc transport has never invoked to connect @@ -202,7 +234,7 @@ class SmallWebRTCConnection(EventEmitter): if self._last_received_time is None: # if we have never received a message, it is probably because the client has not created a data channel # so we are going to trust aiortc in this case - return self.pc.connectionState == "connected" + return self._pc.connectionState == "connected" # Checks if the last received ping was within the last 3 seconds. return (time.time() - self._last_received_time) < 3 @@ -210,7 +242,7 @@ class SmallWebRTCConnection(EventEmitter): # Transceivers always appear in creation-order for both peers # For now we are only considering that we are going to have 02 transceivers, # one for audio and one for video - transceivers = self.pc.getTransceivers() + transceivers = self._pc.getTransceivers() if len(transceivers) == 0 or not transceivers[0].receiver: logger.warning("No audio transceiver is available") return None @@ -221,7 +253,7 @@ class SmallWebRTCConnection(EventEmitter): # Transceivers always appear in creation-order for both peers # For now we are only considering that we are going to have 02 transceivers, # one for audio and one for video - transceivers = self.pc.getTransceivers() + transceivers = self._pc.getTransceivers() if len(transceivers) <= 1 or not transceivers[1].receiver: logger.warning("No video transceiver is available") return None diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py index 11a000e69..bf6670883 100644 --- a/src/pipecat/transports/network/websocket_client.py +++ b/src/pipecat/transports/network/websocket_client.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # - import asyncio import io import time diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index e542342a2..8947cbab3 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -6,7 +6,6 @@ import asyncio import io -import json import time import wave from typing import Awaitable, Callable, Optional diff --git a/src/pipecat/utils/event_emitter.py b/src/pipecat/utils/event_emitter.py deleted file mode 100644 index dcffd2fd1..000000000 --- a/src/pipecat/utils/event_emitter.py +++ /dev/null @@ -1,20 +0,0 @@ -class EventEmitter: - def __init__(self): - self._events = {} - - def on(self, event_name): - """Decorator to register an event handler.""" - - def decorator(func): - if event_name not in self._events: - self._events[event_name] = [] - self._events[event_name].append(func) - return func - - return decorator - - async def emit(self, event_name, *args, **kwargs): - """Trigger all handlers for a given event.""" - if event_name in self._events: - for handler in self._events[event_name]: - await handler(*args, **kwargs)