Merge pull request #2682 from pipecat-ai/filipi/smallwebrtc_leak

Smallwebrtc memory leak
This commit is contained in:
Filipi da Silva Fuchter
2025-09-18 18:56:08 -03:00
committed by GitHub
5 changed files with 87 additions and 36 deletions

View File

@@ -47,6 +47,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a memory leak in `SmallWebRTCTransport`. In `aiortc`, when you receive
a `MediaStreamTrack` (audio or video), frames are produced asynchronously. If
the code never consumes these frames, they are queued in memory, causing a
memory leak.
- Fixed an issue in `AsyncAITTSService`, where `TTSTextFrames` were not being
pushed.

View File

@@ -106,7 +106,7 @@ tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
webrtc = [ "aiortc~=1.13.0", "opencv-python~=4.11.0.86" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]

12
scripts/mem-watch.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
PID=$1
while true; do
# Clear the screen
clear
# Print the header + RSS in GB
ps -p "$PID" -o pid,comm,rss | \
awk 'NR==1 {print $0, "rss_GB"} NR>1 {printf "%s %s %s %.2f\n", $1,$2,$3,$3/1024/1024}'
sleep 1
done

View File

@@ -95,15 +95,20 @@ class SmallWebRTCTrack:
enable/disable control and frame discarding for audio and video streams.
"""
def __init__(self, track: MediaStreamTrack):
def __init__(self, receiver):
"""Initialize the WebRTC track wrapper.
Args:
track: The underlying MediaStreamTrack to wrap.
index: The index of the track in the transceiver (0 for mic, 1 for cam, 2 for screen)
receiver: The RemoteStreamTrack receiver instance.
"""
self._track = track
self._receiver = receiver
# Configuring the receiver for not consuming the track by default to prevent memory grow
self._receiver._enabled = False
self._track = receiver.track
self._enabled = True
self._last_recv_time: float = 0.0
self._idle_task: Optional[asyncio.Task] = None
self._idle_timeout: float = 2.0 # seconds before discarding old frames
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the track.
@@ -138,13 +143,44 @@ class SmallWebRTCTrack:
async def recv(self) -> Optional[Frame]:
"""Receive the next frame from the track.
Enables the internal receiving state and starts idle watcher.
Returns:
The next frame, except for video tracks, where it returns the frame only if the track is enabled, otherwise, returns None.
"""
self._receiver._enabled = True
self._last_recv_time = time.time()
# start idle watcher if not already running
if not self._idle_task or self._idle_task.done():
self._idle_task = asyncio.create_task(self._idle_watcher())
if not self._enabled and self._track.kind == "video":
return None
return await self._track.recv()
async def _idle_watcher(self):
"""Disable receiving if idle for more than _idle_timeout and monitor queue size."""
while self._receiver._enabled:
await asyncio.sleep(self._idle_timeout)
idle_duration = time.time() - self._last_recv_time
if idle_duration >= self._idle_timeout:
# discard old frames to prevent memory growth
logger.debug(
f"Disabling receiver for {self._track.kind} track after {idle_duration:.2f}s idle"
)
await self.discard_old_frames()
self._receiver._enabled = False
def stop(self):
"""Stop receiving frames from the track."""
self._receiver._enabled = False
if self._idle_task:
self._idle_task.cancel()
self._idle_task = None
if self._track:
self._track.stop()
def __getattr__(self, name):
"""Forward attribute access to the underlying track.
@@ -454,6 +490,10 @@ class SmallWebRTCConnection(BaseObject):
async def _close(self):
"""Close the peer connection and cleanup resources."""
for track in self._track_map.values():
if track:
track.stop()
self._track_map.clear()
if self._pc:
await self._pc.close()
self._message_queue.clear()
@@ -526,8 +566,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No audio transceiver is available")
return None
track = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver.track
audio_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver
audio_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[AUDIO_TRANSCEIVER_INDEX] = audio_track
return audio_track
@@ -548,8 +588,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No video transceiver is available")
return None
track = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track
@@ -570,8 +610,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No screen video transceiver is available")
return None
track = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[SCREEN_VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track

44
uv.lock generated
View File

@@ -215,7 +215,7 @@ wheels = [
[[package]]
name = "aiortc"
version = "1.11.0"
version = "1.13.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aioice" },
@@ -227,15 +227,9 @@ dependencies = [
{ name = "pylibsrtp" },
{ name = "pyopenssl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/91/60/7bb59c28c6e65e5d74258d392f531f555f12ab519b0f467ffd6b76650c20/aiortc-1.11.0.tar.gz", hash = "sha256:50b9d86f6cba87d95ce7c6b051949208b48f8062b231837aed8f049045f11a28", size = 1179206, upload-time = "2025-03-28T10:00:50.327Z" }
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894, upload-time = "2025-05-27T03:23:59.017Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/17/34/5c34707ce58ca0fd3b157a3b478255a8445950bf2b87f048864eb7233f5f/aiortc-1.11.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:018b0d623c6b88b9cd4bd3b700dece943731d081c50fef1b866a43f6b46a7343", size = 1218501, upload-time = "2025-03-28T10:00:39.44Z" },
{ url = "https://files.pythonhosted.org/packages/1b/d7/cc1d483097f2ae605e07e9f7af004c473da5756af25149823de2047eb991/aiortc-1.11.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd6477ac9227e9fd80ca079d6614b5b0b45c1887f214e67cddc7fde2692d95", size = 898901, upload-time = "2025-03-28T10:00:41.709Z" },
{ url = "https://files.pythonhosted.org/packages/00/64/caf7e7b3c49d492ba79256638644812d66ca68dcfa8e27307fd58f564555/aiortc-1.11.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bc311672d25091061eaa9c3fe1adbb7f2ef677c6fabd2cffdff8c724c1f81ce7", size = 1750429, upload-time = "2025-03-28T10:00:43.802Z" },
{ url = "https://files.pythonhosted.org/packages/11/12/3e37c16de90ead788e45bfe10fe6fea66711919d2bf3826f663779824de0/aiortc-1.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f57c5804135d357291f25de65faf7a844d7595c6eb12493e0a304f4d5c34d660", size = 1867914, upload-time = "2025-03-28T10:00:45.049Z" },
{ url = "https://files.pythonhosted.org/packages/aa/a9/f0a32b3966e8bc8cf4faea558b6e40171eacfc04b14e8b077bebc6ec57e3/aiortc-1.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43ff9f5c2a5d657fbb4ab8c9b4e4c9d2967753e03c4539eb1dd82014816ef6a0", size = 1893742, upload-time = "2025-03-28T10:00:46.393Z" },
{ url = "https://files.pythonhosted.org/packages/a5/c5/57f997af08ceca5e78a5f23e4cb93445236eff39af0c9940495ae7069de4/aiortc-1.11.0-cp39-abi3-win32.whl", hash = "sha256:5e10a50ca6df3abc32811e1c84fe131b7d20d3e5349f521ca430683ca9a96c70", size = 923160, upload-time = "2025-03-28T10:00:47.578Z" },
{ url = "https://files.pythonhosted.org/packages/b2/ce/7f969694b950f673d7bf5ec697608366bd585ff741760e107e3eff55b131/aiortc-1.11.0-cp39-abi3-win_amd64.whl", hash = "sha256:67debf5ce89fb12c64b4be24e70809b29f1bb0e635914760d0c2e1193955ff62", size = 1009541, upload-time = "2025-03-28T10:00:49.09Z" },
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910, upload-time = "2025-05-27T03:23:57.344Z" },
]
[[package]]
@@ -3160,12 +3154,12 @@ name = "mlx-lm"
version = "0.27.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jinja2" },
{ name = "mlx" },
{ name = "numpy" },
{ name = "protobuf" },
{ name = "pyyaml" },
{ name = "transformers" },
{ name = "jinja2", marker = "sys_platform == 'darwin'" },
{ name = "mlx", marker = "sys_platform == 'darwin'" },
{ name = "numpy", marker = "sys_platform == 'darwin'" },
{ name = "protobuf", marker = "sys_platform == 'darwin'" },
{ name = "pyyaml", marker = "sys_platform == 'darwin'" },
{ name = "transformers", marker = "sys_platform == 'darwin'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/41/77/e8d3a82658a2070bc392a583dd08c8d24088433e920eac4905bf882255ad/mlx_lm-0.27.1.tar.gz", hash = "sha256:36640fb64c909cfd9baddf37b16e7d3b94a1a141033e6b7ea7a0ef5a965fb4ae", size = 185170, upload-time = "2025-09-04T16:06:57.949Z" }
wheels = [
@@ -3658,7 +3652,7 @@ name = "nvidia-cudnn-cu12"
version = "9.5.1.17"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/78/4535c9c7f859a64781e43c969a3a7e84c54634e319a996d43ef32ce46f83/nvidia_cudnn_cu12-9.5.1.17-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:30ac3869f6db17d170e0e556dd6cc5eee02647abc31ca856634d5a40f82c15b2", size = 570988386, upload-time = "2024-10-25T19:54:26.39Z" },
@@ -3669,7 +3663,7 @@ name = "nvidia-cufft-cu12"
version = "11.3.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/16/73727675941ab8e6ffd86ca3a4b7b47065edcca7a997920b831f8147c99d/nvidia_cufft_cu12-11.3.0.4-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ccba62eb9cef5559abd5e0d54ceed2d9934030f51163df018532142a8ec533e5", size = 200221632, upload-time = "2024-11-20T17:41:32.357Z" },
@@ -3698,9 +3692,9 @@ name = "nvidia-cusolver-cu12"
version = "11.7.1.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12" },
{ name = "nvidia-cusparse-cu12" },
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/f0/6e/c2cf12c9ff8b872e92b4a5740701e51ff17689c4d726fca91875b07f655d/nvidia_cusolver_cu12-11.7.1.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e9e49843a7707e42022babb9bcfa33c29857a93b88020c4e4434656a655b698c", size = 158229790, upload-time = "2024-11-20T17:43:43.211Z" },
@@ -3712,7 +3706,7 @@ name = "nvidia-cusparse-cu12"
version = "12.5.4.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/06/1e/b8b7c2f4099a37b96af5c9bb158632ea9e5d9d27d7391d7eb8fc45236674/nvidia_cusparse_cu12-12.5.4.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7556d9eca156e18184b94947ade0fba5bb47d69cec46bf8660fd2c71a4b48b73", size = 216561367, upload-time = "2024-11-20T17:44:54.824Z" },
@@ -4471,7 +4465,7 @@ requires-dist = [
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.11.0" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.13.0" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" },
{ name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.0.2" },
@@ -7169,7 +7163,7 @@ name = "triton"
version = "3.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "setuptools" },
{ name = "setuptools", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/04/d54d3a6d077c646624dc9461b0059e23fd5d30e0dbe67471e3654aec81f9/triton-3.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fad99beafc860501d7fcc1fb7045d9496cbe2c882b1674640304949165a916e7", size = 156441993, upload-time = "2025-04-09T20:27:25.107Z" },
@@ -7722,8 +7716,8 @@ name = "xformers"
version = "0.0.30"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
{ name = "torch" },
{ name = "numpy", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "torch", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bf/f7/dd2269cce89fd1221947dd7cc3a60707ffe721ef55c1803ac3b1a1f7ae5c/xformers-0.0.30.tar.gz", hash = "sha256:a12bf3eb39e294cdbe8a7253ac9b665f41bac61d6d98df174e34ef7bdb6f2fc4", size = 10214139, upload-time = "2025-04-28T20:51:02.045Z" }
wheels = [