diff --git a/CHANGELOG.md b/CHANGELOG.md index 7331a709b..e937eaf05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a memory leak in `SmallWebRTCTransport`. In `aiortc`, when you receive + a `MediaStreamTrack` (audio or video), frames are produced asynchronously. If + the code never consumes these frames, they are queued in memory, causing a + memory leak. + - Fixed an issue in `AsyncAITTSService`, where `TTSTextFrames` were not being pushed. diff --git a/pyproject.toml b/pyproject.toml index a855ce4a3..c26b7744c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,7 +106,7 @@ tavus=[] together = [] tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ] ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ] -webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ] +webrtc = [ "aiortc~=1.13.0", "opencv-python~=4.11.0.86" ] websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ] websockets-base = [ "websockets>=13.1,<16.0" ] whisper = [ "faster-whisper~=1.1.1" ] diff --git a/scripts/mem-watch.sh b/scripts/mem-watch.sh new file mode 100755 index 000000000..0b87dd906 --- /dev/null +++ b/scripts/mem-watch.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +PID=$1 + +while true; do + # Clear the screen + clear + # Print the header + RSS in GB + ps -p "$PID" -o pid,comm,rss | \ + awk 'NR==1 {print $0, "rss_GB"} NR>1 {printf "%s %s %s %.2f\n", $1,$2,$3,$3/1024/1024}' + sleep 1 +done \ No newline at end of file diff --git a/src/pipecat/transports/smallwebrtc/connection.py b/src/pipecat/transports/smallwebrtc/connection.py index 420656f85..decd8ca58 100644 --- a/src/pipecat/transports/smallwebrtc/connection.py +++ b/src/pipecat/transports/smallwebrtc/connection.py @@ -95,15 +95,20 @@ class SmallWebRTCTrack: enable/disable control and frame discarding for audio and video streams. """ - def __init__(self, track: MediaStreamTrack): + def __init__(self, receiver): """Initialize the WebRTC track wrapper. Args: - track: The underlying MediaStreamTrack to wrap. - index: The index of the track in the transceiver (0 for mic, 1 for cam, 2 for screen) + receiver: The RemoteStreamTrack receiver instance. """ - self._track = track + self._receiver = receiver + # Configuring the receiver for not consuming the track by default to prevent memory grow + self._receiver._enabled = False + self._track = receiver.track self._enabled = True + self._last_recv_time: float = 0.0 + self._idle_task: Optional[asyncio.Task] = None + self._idle_timeout: float = 2.0 # seconds before discarding old frames def set_enabled(self, enabled: bool) -> None: """Enable or disable the track. @@ -138,13 +143,44 @@ class SmallWebRTCTrack: async def recv(self) -> Optional[Frame]: """Receive the next frame from the track. + Enables the internal receiving state and starts idle watcher. + Returns: The next frame, except for video tracks, where it returns the frame only if the track is enabled, otherwise, returns None. """ + self._receiver._enabled = True + self._last_recv_time = time.time() + + # start idle watcher if not already running + if not self._idle_task or self._idle_task.done(): + self._idle_task = asyncio.create_task(self._idle_watcher()) + if not self._enabled and self._track.kind == "video": return None return await self._track.recv() + async def _idle_watcher(self): + """Disable receiving if idle for more than _idle_timeout and monitor queue size.""" + while self._receiver._enabled: + await asyncio.sleep(self._idle_timeout) + idle_duration = time.time() - self._last_recv_time + if idle_duration >= self._idle_timeout: + # discard old frames to prevent memory growth + logger.debug( + f"Disabling receiver for {self._track.kind} track after {idle_duration:.2f}s idle" + ) + await self.discard_old_frames() + self._receiver._enabled = False + + def stop(self): + """Stop receiving frames from the track.""" + self._receiver._enabled = False + if self._idle_task: + self._idle_task.cancel() + self._idle_task = None + if self._track: + self._track.stop() + def __getattr__(self, name): """Forward attribute access to the underlying track. @@ -454,6 +490,10 @@ class SmallWebRTCConnection(BaseObject): async def _close(self): """Close the peer connection and cleanup resources.""" + for track in self._track_map.values(): + if track: + track.stop() + self._track_map.clear() if self._pc: await self._pc.close() self._message_queue.clear() @@ -526,8 +566,8 @@ class SmallWebRTCConnection(BaseObject): logger.warning("No audio transceiver is available") return None - track = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver.track - audio_track = SmallWebRTCTrack(track) if track else None + receiver = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver + audio_track = SmallWebRTCTrack(receiver) if receiver else None self._track_map[AUDIO_TRANSCEIVER_INDEX] = audio_track return audio_track @@ -548,8 +588,8 @@ class SmallWebRTCConnection(BaseObject): logger.warning("No video transceiver is available") return None - track = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver.track - video_track = SmallWebRTCTrack(track) if track else None + receiver = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver + video_track = SmallWebRTCTrack(receiver) if receiver else None self._track_map[VIDEO_TRANSCEIVER_INDEX] = video_track return video_track @@ -570,8 +610,8 @@ class SmallWebRTCConnection(BaseObject): logger.warning("No screen video transceiver is available") return None - track = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver.track - video_track = SmallWebRTCTrack(track) if track else None + receiver = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver + video_track = SmallWebRTCTrack(receiver) if receiver else None self._track_map[SCREEN_VIDEO_TRANSCEIVER_INDEX] = video_track return video_track diff --git a/uv.lock b/uv.lock index c2f732ad3..63381d8a7 100644 --- a/uv.lock +++ b/uv.lock @@ -215,7 +215,7 @@ wheels = [ [[package]] name = "aiortc" -version = "1.11.0" +version = "1.13.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aioice" }, @@ -227,15 +227,9 @@ dependencies = [ { name = "pylibsrtp" }, { name = "pyopenssl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/91/60/7bb59c28c6e65e5d74258d392f531f555f12ab519b0f467ffd6b76650c20/aiortc-1.11.0.tar.gz", hash = "sha256:50b9d86f6cba87d95ce7c6b051949208b48f8062b231837aed8f049045f11a28", size = 1179206, upload-time = "2025-03-28T10:00:50.327Z" } +sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894, upload-time = "2025-05-27T03:23:59.017Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/17/34/5c34707ce58ca0fd3b157a3b478255a8445950bf2b87f048864eb7233f5f/aiortc-1.11.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:018b0d623c6b88b9cd4bd3b700dece943731d081c50fef1b866a43f6b46a7343", size = 1218501, upload-time = "2025-03-28T10:00:39.44Z" }, - { url = "https://files.pythonhosted.org/packages/1b/d7/cc1d483097f2ae605e07e9f7af004c473da5756af25149823de2047eb991/aiortc-1.11.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd6477ac9227e9fd80ca079d6614b5b0b45c1887f214e67cddc7fde2692d95", size = 898901, upload-time = "2025-03-28T10:00:41.709Z" }, - { url = "https://files.pythonhosted.org/packages/00/64/caf7e7b3c49d492ba79256638644812d66ca68dcfa8e27307fd58f564555/aiortc-1.11.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bc311672d25091061eaa9c3fe1adbb7f2ef677c6fabd2cffdff8c724c1f81ce7", size = 1750429, upload-time = "2025-03-28T10:00:43.802Z" }, - { url = "https://files.pythonhosted.org/packages/11/12/3e37c16de90ead788e45bfe10fe6fea66711919d2bf3826f663779824de0/aiortc-1.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f57c5804135d357291f25de65faf7a844d7595c6eb12493e0a304f4d5c34d660", size = 1867914, upload-time = "2025-03-28T10:00:45.049Z" }, - { url = "https://files.pythonhosted.org/packages/aa/a9/f0a32b3966e8bc8cf4faea558b6e40171eacfc04b14e8b077bebc6ec57e3/aiortc-1.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43ff9f5c2a5d657fbb4ab8c9b4e4c9d2967753e03c4539eb1dd82014816ef6a0", size = 1893742, upload-time = "2025-03-28T10:00:46.393Z" }, - { url = "https://files.pythonhosted.org/packages/a5/c5/57f997af08ceca5e78a5f23e4cb93445236eff39af0c9940495ae7069de4/aiortc-1.11.0-cp39-abi3-win32.whl", hash = "sha256:5e10a50ca6df3abc32811e1c84fe131b7d20d3e5349f521ca430683ca9a96c70", size = 923160, upload-time = "2025-03-28T10:00:47.578Z" }, - { url = "https://files.pythonhosted.org/packages/b2/ce/7f969694b950f673d7bf5ec697608366bd585ff741760e107e3eff55b131/aiortc-1.11.0-cp39-abi3-win_amd64.whl", hash = "sha256:67debf5ce89fb12c64b4be24e70809b29f1bb0e635914760d0c2e1193955ff62", size = 1009541, upload-time = "2025-03-28T10:00:49.09Z" }, + { url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910, upload-time = "2025-05-27T03:23:57.344Z" }, ] [[package]] @@ -3160,12 +3154,12 @@ name = "mlx-lm" version = "0.27.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "jinja2" }, - { name = "mlx" }, - { name = "numpy" }, - { name = "protobuf" }, - { name = "pyyaml" }, - { name = "transformers" }, + { name = "jinja2", marker = "sys_platform == 'darwin'" }, + { name = "mlx", marker = "sys_platform == 'darwin'" }, + { name = "numpy", marker = "sys_platform == 'darwin'" }, + { name = "protobuf", marker = "sys_platform == 'darwin'" }, + { name = "pyyaml", marker = "sys_platform == 'darwin'" }, + { name = "transformers", marker = "sys_platform == 'darwin'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/41/77/e8d3a82658a2070bc392a583dd08c8d24088433e920eac4905bf882255ad/mlx_lm-0.27.1.tar.gz", hash = "sha256:36640fb64c909cfd9baddf37b16e7d3b94a1a141033e6b7ea7a0ef5a965fb4ae", size = 185170, upload-time = "2025-09-04T16:06:57.949Z" } wheels = [ @@ -3658,7 +3652,7 @@ name = "nvidia-cudnn-cu12" version = "9.5.1.17" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-cublas-cu12" }, + { name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/2a/78/4535c9c7f859a64781e43c969a3a7e84c54634e319a996d43ef32ce46f83/nvidia_cudnn_cu12-9.5.1.17-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:30ac3869f6db17d170e0e556dd6cc5eee02647abc31ca856634d5a40f82c15b2", size = 570988386, upload-time = "2024-10-25T19:54:26.39Z" }, @@ -3669,7 +3663,7 @@ name = "nvidia-cufft-cu12" version = "11.3.0.4" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/8f/16/73727675941ab8e6ffd86ca3a4b7b47065edcca7a997920b831f8147c99d/nvidia_cufft_cu12-11.3.0.4-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ccba62eb9cef5559abd5e0d54ceed2d9934030f51163df018532142a8ec533e5", size = 200221632, upload-time = "2024-11-20T17:41:32.357Z" }, @@ -3698,9 +3692,9 @@ name = "nvidia-cusolver-cu12" version = "11.7.1.2" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-cublas-cu12" }, - { name = "nvidia-cusparse-cu12" }, - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, + { name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, + { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/f0/6e/c2cf12c9ff8b872e92b4a5740701e51ff17689c4d726fca91875b07f655d/nvidia_cusolver_cu12-11.7.1.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e9e49843a7707e42022babb9bcfa33c29857a93b88020c4e4434656a655b698c", size = 158229790, upload-time = "2024-11-20T17:43:43.211Z" }, @@ -3712,7 +3706,7 @@ name = "nvidia-cusparse-cu12" version = "12.5.4.2" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/06/1e/b8b7c2f4099a37b96af5c9bb158632ea9e5d9d27d7391d7eb8fc45236674/nvidia_cusparse_cu12-12.5.4.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7556d9eca156e18184b94947ade0fba5bb47d69cec46bf8660fd2c71a4b48b73", size = 216561367, upload-time = "2024-11-20T17:44:54.824Z" }, @@ -4471,7 +4465,7 @@ requires-dist = [ { name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" }, { name = "aiofiles", specifier = ">=24.1.0,<25" }, { name = "aiohttp", specifier = ">=3.11.12,<4" }, - { name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.11.0" }, + { name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.13.0" }, { name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" }, { name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" }, { name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.0.2" }, @@ -7169,7 +7163,7 @@ name = "triton" version = "3.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "setuptools" }, + { name = "setuptools", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/76/04/d54d3a6d077c646624dc9461b0059e23fd5d30e0dbe67471e3654aec81f9/triton-3.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fad99beafc860501d7fcc1fb7045d9496cbe2c882b1674640304949165a916e7", size = 156441993, upload-time = "2025-04-09T20:27:25.107Z" }, @@ -7722,8 +7716,8 @@ name = "xformers" version = "0.0.30" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "numpy" }, - { name = "torch" }, + { name = "numpy", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, + { name = "torch", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/bf/f7/dd2269cce89fd1221947dd7cc3a60707ffe721ef55c1803ac3b1a1f7ae5c/xformers-0.0.30.tar.gz", hash = "sha256:a12bf3eb39e294cdbe8a7253ac9b665f41bac61d6d98df174e34ef7bdb6f2fc4", size = 10214139, upload-time = "2025-04-28T20:51:02.045Z" } wheels = [