From 434d346079e470d8b2fde4e4e54fd34c44199561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 6 Aug 2025 22:56:08 -0700 Subject: [PATCH] RivaSTTService: handle future cancellation --- CHANGELOG.md | 4 ++++ src/pipecat/services/riva/stt.py | 9 +++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46a4082eb..aa7f46d3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed a `RivaSTTService` issue that would result in an unhandled + `concurrent.futures.CancelledError` when a future is cancelled when reading + from the audio chunks from the incoming audio stream. + - Fixed an issue in the `BaseOutputTransport`, mainly reproducible with `FastAPIWebsocketOutputTransport` when the audio mixer was enabled, where the loop could consume 100% CPU by continuously returning without delay, preventing diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index 7d9e8743d..ba1af4f6f 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -7,6 +7,7 @@ """NVIDIA Riva Speech-to-Text service implementations for real-time and batch transcription.""" import asyncio +from concurrent.futures import CancelledError as FuturesCancelledError from typing import AsyncGenerator, List, Mapping, Optional from loguru import logger @@ -366,8 +367,12 @@ class RivaSTTService(STTService): """ if not self._thread_running: raise StopIteration - future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) - return future.result() + + try: + future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop()) + return future.result() + except FuturesCancelledError: + raise StopIteration def __iter__(self): """Return iterator for audio chunk processing.