From 434d346079e470d8b2fde4e4e54fd34c44199561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 Aug 2025 22:56:08 -0700 Subject: [PATCH 1/3] RivaSTTService: handle future cancellation --- CHANGELOG.md | 4 ++++ src/pipecat/services/riva/stt.py | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46a4082eb..aa7f46d3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `RivaSTTService` issue that would result in an unhandled + `concurrent.futures.CancelledError` when a future is cancelled when reading + from the audio chunks from the incoming audio stream. + - Fixed an issue in the `BaseOutputTransport`, mainly reproducible with `FastAPIWebsocketOutputTransport` when the audio mixer was enabled, where the loop could consume 100% CPU by continuously returning without delay, preventing diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index 7d9e8743d..ba1af4f6f 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -7,6 +7,7 @@ """NVIDIA Riva Speech-to-Text service implementations for real-time and batch transcription.""" import asyncio +from concurrent.futures import CancelledError as FuturesCancelledError from typing import AsyncGenerator, List, Mapping, Optional from loguru import logger @@ -366,8 +367,12 @@ class RivaSTTService(STTService): """ if not self._thread_running: raise StopIteration - future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) - return future.result() + + try: + future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) + return future.result() + except FuturesCancelledError: + raise StopIteration def __iter__(self): """Return iterator for audio chunk processing. From c97643c79757a0fd7a1ac5f7c2b1d10832919b75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 Aug 2025 23:00:03 -0700 Subject: [PATCH 2/3] RivaSTTService: always use WatchdogQueue --- src/pipecat/services/riva/stt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index ba1af4f6f..be90e8732 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -168,7 +168,7 @@ class RivaSTTService(STTService): self._asr_service = riva.client.ASRService(auth) - self._queue = asyncio.Queue() + self._queue = None self._config = None self._thread_task = None self._response_task = None @@ -239,6 +239,7 @@ class RivaSTTService(STTService): riva.client.add_custom_configuration_to_config(config, self._custom_configuration) self._config = config + self._queue = WatchdogQueue(self.task_manager) if not self._thread_task: self._thread_task = self.create_task(self._thread_task_handler()) From 86c6141580f26666f59c7674edc17d38b0f645b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 Aug 2025 23:01:41 -0700 Subject: [PATCH 3/3] DailyTransport: handle future cancellation --- CHANGELOG.md | 3 +++ src/pipecat/transports/services/daily.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa7f46d3e..f54ba1ab9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `DailyTransport` issue that would result in an unhandled + `concurrent.futures.CancelledError` when a future is cancelled. + - Fixed a `RivaSTTService` issue that would result in an unhandled `concurrent.futures.CancelledError` when a future is cancelled when reading from the audio chunks from the incoming audio stream. diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 1ad443cb5..daf879be7 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -13,6 +13,7 @@ real-time communication features. import asyncio import time +from concurrent.futures import CancelledError as FuturesCancelledError from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, Mapping, Optional @@ -1320,10 +1321,13 @@ class DailyTransportClient(EventHandler): def _call_async_callback(self, queue: asyncio.Queue, callback, *args): """Queue a callback for async execution on the event loop.""" - future = asyncio.run_coroutine_threadsafe( - queue.put((callback, *args)), self._get_event_loop() - ) - future.result() + try: + future = asyncio.run_coroutine_threadsafe( + queue.put((callback, *args)), self._get_event_loop() + ) + future.result() + except FuturesCancelledError: + pass async def _callback_task_handler(self, queue: asyncio.Queue): """Handle queued callbacks from the specified queue."""