diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bcb83279..c90e90984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `DailyTransport` issue that would result in an unhandled + `concurrent.futures.CancelledError` when a future is cancelled. + +- Fixed a `RivaSTTService` issue that would result in an unhandled + `concurrent.futures.CancelledError` when a future is cancelled when reading + from the audio chunks from the incoming audio stream. + - Fixed an issue in the `BaseOutputTransport`, mainly reproducible with `FastAPIWebsocketOutputTransport` when the audio mixer was enabled, where the loop could consume 100% CPU by continuously returning without delay, preventing diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index 7d9e8743d..be90e8732 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -7,6 +7,7 @@ """NVIDIA Riva Speech-to-Text service implementations for real-time and batch transcription.""" import asyncio +from concurrent.futures import CancelledError as FuturesCancelledError from typing import AsyncGenerator, List, Mapping, Optional from loguru import logger @@ -167,7 +168,7 @@ class RivaSTTService(STTService): self._asr_service = riva.client.ASRService(auth) - self._queue = asyncio.Queue() + self._queue = None self._config = None self._thread_task = None self._response_task = None @@ -238,6 +239,7 @@ class RivaSTTService(STTService): riva.client.add_custom_configuration_to_config(config, self._custom_configuration) self._config = config + self._queue = WatchdogQueue(self.task_manager) if not self._thread_task: self._thread_task = self.create_task(self._thread_task_handler()) @@ -366,8 +368,12 @@ class RivaSTTService(STTService): """ if not self._thread_running: raise StopIteration - future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) - return future.result() + + try: + future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) + return future.result() + except FuturesCancelledError: + raise StopIteration def __iter__(self): """Return iterator for audio chunk processing. diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 1ad443cb5..daf879be7 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -13,6 +13,7 @@ real-time communication features. import asyncio import time +from concurrent.futures import CancelledError as FuturesCancelledError from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, Mapping, Optional @@ -1320,10 +1321,13 @@ class DailyTransportClient(EventHandler): def _call_async_callback(self, queue: asyncio.Queue, callback, *args): """Queue a callback for async execution on the event loop.""" - future = asyncio.run_coroutine_threadsafe( - queue.put((callback, *args)), self._get_event_loop() - ) - future.result() + try: + future = asyncio.run_coroutine_threadsafe( + queue.put((callback, *args)), self._get_event_loop() + ) + future.result() + except FuturesCancelledError: + pass async def _callback_task_handler(self, queue: asyncio.Queue): """Handle queued callbacks from the specified queue."""