Merge pull request #2383 from pipecat-ai/aleix/riva-stt-iterator-exception

properly handle concurrent.futures.CancelledError
This commit is contained in:
Mark Backman
2025-08-07 04:39:56 -07:00
committed by GitHub
3 changed files with 24 additions and 7 deletions

View File

@@ -81,6 +81,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `DailyTransport` issue that would result in an unhandled
`concurrent.futures.CancelledError` when a future is cancelled.
- Fixed a `RivaSTTService` issue that would result in an unhandled
`concurrent.futures.CancelledError` when a future is cancelled when reading
from the audio chunks from the incoming audio stream.
- Fixed an issue in the `BaseOutputTransport`, mainly reproducible with
`FastAPIWebsocketOutputTransport` when the audio mixer was enabled, where the
loop could consume 100% CPU by continuously returning without delay, preventing

View File

@@ -7,6 +7,7 @@
"""NVIDIA Riva Speech-to-Text service implementations for real-time and batch transcription."""
import asyncio
from concurrent.futures import CancelledError as FuturesCancelledError
from typing import AsyncGenerator, List, Mapping, Optional
from loguru import logger
@@ -167,7 +168,7 @@ class RivaSTTService(STTService):
self._asr_service = riva.client.ASRService(auth)
self._queue = asyncio.Queue()
self._queue = None
self._config = None
self._thread_task = None
self._response_task = None
@@ -238,6 +239,7 @@ class RivaSTTService(STTService):
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
self._config = config
self._queue = WatchdogQueue(self.task_manager)
if not self._thread_task:
self._thread_task = self.create_task(self._thread_task_handler())
@@ -366,8 +368,12 @@ class RivaSTTService(STTService):
"""
if not self._thread_running:
raise StopIteration
future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop())
return future.result()
try:
future = asyncio.run_coroutine_threadsafe(self._queue.get(), self.get_event_loop())
return future.result()
except FuturesCancelledError:
raise StopIteration
def __iter__(self):
"""Return iterator for audio chunk processing.

View File

@@ -13,6 +13,7 @@ real-time communication features.
import asyncio
import time
from concurrent.futures import CancelledError as FuturesCancelledError
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
@@ -1320,10 +1321,13 @@ class DailyTransportClient(EventHandler):
def _call_async_callback(self, queue: asyncio.Queue, callback, *args):
"""Queue a callback for async execution on the event loop."""
future = asyncio.run_coroutine_threadsafe(
queue.put((callback, *args)), self._get_event_loop()
)
future.result()
try:
future = asyncio.run_coroutine_threadsafe(
queue.put((callback, *args)), self._get_event_loop()
)
future.result()
except FuturesCancelledError:
pass
async def _callback_task_handler(self, queue: asyncio.Queue):
"""Handle queued callbacks from the specified queue."""