diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f979a6f8..8e13653b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `pipecat.utils.timeout.wait_for()` which uses `wait_for2` package to - avoid `asyncio.wait_for()` issues in Python < 3.12. - - Allow passing custom pipeline sink and source processors to a `Pipeline`. Pipeline source and sink processors are used to know and control what's coming in and out of a `Pipeline` processor. @@ -49,9 +46,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Replaced `asyncio.wait_for()` for Pipecat's `wait_for()`. In Python 3.10, - `asyncio.wait_for()` has issues regarding task cancellation (i.e. cancellation - is never propagated). +- Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < + 3.12. because of issues regarding task cancellation (i.e. cancellation is + never propagated). See https://bugs.python.org/issue42130 - Fixed an `AudioBufferProcessor` issues that would cause audio overlap when diff --git a/scripts/evals/eval.py b/scripts/evals/eval.py index b78b98bca..b91f27c6e 100644 --- a/scripts/evals/eval.py +++ b/scripts/evals/eval.py @@ -43,7 +43,6 @@ from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.llm_service import FunctionCallParams from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport -from pipecat.utils.asyncio.timeout import wait_for SCRIPT_DIR = Path(__file__).resolve().parent @@ -123,7 +122,7 @@ class EvalRunner: logger.error(f"ERROR: Unable to run {example_file}: {e}") try: - result = await wait_for(self._queue.get(), timeout=1.0) + result = await asyncio.wait_for(self._queue.get(), timeout=1.0) except asyncio.TimeoutError: result = False diff --git a/src/pipecat/__init__.py b/src/pipecat/__init__.py index de89130cc..1975e1654 100644 --- a/src/pipecat/__init__.py +++ b/src/pipecat/__init__.py @@ -12,3 +12,20 @@ from loguru import logger __version__ = version("pipecat-ai") logger.info(f"ᓚᘏᗢ Pipecat {__version__} (Python {sys.version}) ᓚᘏᗢ") + +# We replace `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < 3.12. +# +# In Python 3.12, `asyncio.wait_for()` is implemented in terms of +# `asyncio.timeout()` which fixed a bunch of issues. However, this was never +# backported (because of the lack of `async.timeout()`) and there are still many +# remainig issues, specially in Python 3.10, in `async.wait_for()`. +# +# See https://github.com/python/cpython/pull/98518 + +import asyncio + +if sys.version_info < (3, 12): + import wait_for2 + + # Replace asyncio.wait_for. + asyncio.wait_for = wait_for2.wait_for diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 92cda3039..7ff924531 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -51,7 +51,6 @@ from pipecat.utils.asyncio.task_manager import ( TaskManager, TaskManagerParams, ) -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.tracing.setup import is_tracing_available from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver @@ -655,7 +654,7 @@ class PipelineTask(BasePipelineTask): wait_time = HEARTBEAT_MONITOR_SECONDS while True: try: - frame = await wait_for(self._heartbeat_queue.get(), timeout=wait_time) + frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time) process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000 logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds") self._heartbeat_queue.task_done() @@ -678,7 +677,9 @@ class PipelineTask(BasePipelineTask): while running: try: - frame = await wait_for(self._idle_queue.get(), timeout=self._idle_timeout_secs) + frame = await asyncio.wait_for( + self._idle_queue.get(), timeout=self._idle_timeout_secs + ) if not isinstance(frame, InputAudioRawFrame): frame_buffer.append(frame) diff --git a/src/pipecat/processors/aggregators/dtmf_aggregator.py b/src/pipecat/processors/aggregators/dtmf_aggregator.py index 3676824fd..75cded7fb 100644 --- a/src/pipecat/processors/aggregators/dtmf_aggregator.py +++ b/src/pipecat/processors/aggregators/dtmf_aggregator.py @@ -25,7 +25,6 @@ from pipecat.frames.frames import ( TranscriptionFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_event import WatchdogEvent from pipecat.utils.time import time_now_iso8601 @@ -135,7 +134,7 @@ class DTMFAggregator(FrameProcessor): """Background task that handles timeout-based flushing.""" while True: try: - await wait_for(self._digit_event.wait(), timeout=self._idle_timeout) + await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout) self._digit_event.clear() except asyncio.TimeoutError: self.reset_watchdog() diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 1d251d495..a5b746467 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -60,7 +60,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContextFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.time import time_now_iso8601 @@ -671,7 +670,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): if self._vad_params else self._params.turn_emulated_vad_timeout ) - await wait_for(self._aggregation_event.wait(), timeout=timeout) + await asyncio.wait_for(self._aggregation_event.wait(), timeout=timeout) await self._maybe_emulate_user_speaking() except asyncio.TimeoutError: if not self._user_speaking: diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 5357b2f64..b8839124a 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List, Optional from pipecat.frames.frames import Frame, StartFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_event import WatchdogEvent @@ -86,7 +85,7 @@ class IdleFrameProcessor(FrameProcessor): """Handle idle timeout monitoring and callback execution.""" while True: try: - await wait_for(self._idle_event.wait(), timeout=self._timeout) + await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) except asyncio.TimeoutError: await self._callback(self) finally: diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index a38e3f17f..5f6b25b95 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_event import WatchdogEvent @@ -192,7 +191,7 @@ class UserIdleProcessor(FrameProcessor): """ while True: try: - await wait_for(self._idle_event.wait(), timeout=self._timeout) + await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) except asyncio.TimeoutError: if not self._interrupted: self._retry_count += 1 diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index 93d39e265..e351ecab2 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -53,7 +53,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm @@ -186,7 +185,9 @@ class AnthropicLLMService(LLMService): """ if self._retry_on_timeout: try: - response = await wait_for(api_call(**params), timeout=self._retry_timeout_secs) + response = await asyncio.wait_for( + api_call(**params), timeout=self._retry_timeout_secs + ) return response except (APITimeoutError, asyncio.TimeoutError): # Retry, this time without a timeout so we get a response diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index c999a3eed..f25a41661 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -31,7 +31,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt @@ -220,7 +219,7 @@ class AssemblyAISTTService(STTService): await self._websocket.send(json.dumps({"type": "Terminate"})) try: - await wait_for(self._termination_event.wait(), timeout=5.0) + await asyncio.wait_for(self._termination_event.wait(), timeout=5.0) except asyncio.TimeoutError: logger.warning("Timed out waiting for termination message from server") @@ -245,7 +244,7 @@ class AssemblyAISTTService(STTService): try: while self._connected: try: - message = await wait_for(self._websocket.recv(), timeout=1.0) + message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0) data = json.loads(message) await self._handle_message(data) except asyncio.TimeoutError: diff --git a/src/pipecat/services/aws/llm.py b/src/pipecat/services/aws/llm.py index cdf144d26..c935c141d 100644 --- a/src/pipecat/services/aws/llm.py +++ b/src/pipecat/services/aws/llm.py @@ -52,7 +52,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import LLMService -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.tracing.service_decorators import traced_llm try: @@ -802,7 +801,7 @@ class AWSBedrockLLMService(LLMService): """ if self._retry_on_timeout: try: - response = await wait_for( + response = await asyncio.wait_for( await client.converse_stream(**request_params), timeout=self._retry_timeout_secs ) return response diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index 752866efa..f67f0979f 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -31,7 +31,6 @@ from pipecat.frames.frames import ( from pipecat.services.aws.utils import build_event_message, decode_event, get_presigned_url from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_stt @@ -481,7 +480,7 @@ class AWSTranscribeSTTService(STTService): break try: - response = await wait_for(self._ws_client.recv(), timeout=1.0) + response = await asyncio.wait_for(self._ws_client.recv(), timeout=1.0) headers, payload = decode_event(response) diff --git a/src/pipecat/services/heygen/client.py b/src/pipecat/services/heygen/client.py index a91f5e675..2a464f949 100644 --- a/src/pipecat/services/heygen/client.py +++ b/src/pipecat/services/heygen/client.py @@ -31,7 +31,6 @@ from pipecat.processors.frame_processor import FrameProcessorSetup from pipecat.services.heygen.api import HeyGenApi, HeyGenSession, NewSessionRequest from pipecat.transports.base_transport import TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue try: @@ -232,7 +231,7 @@ class HeyGenClient: """Handle incoming WebSocket messages.""" while self._connected: try: - message = await wait_for(self._websocket.recv(), timeout=1.0) + message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0) parsed_message = json.loads(message) await self._handle_ws_server_event(parsed_message) except asyncio.TimeoutError: diff --git a/src/pipecat/services/heygen/video.py b/src/pipecat/services/heygen/video.py index f46ec7d76..96c684641 100644 --- a/src/pipecat/services/heygen/video.py +++ b/src/pipecat/services/heygen/video.py @@ -40,7 +40,6 @@ from pipecat.services.ai_service import AIService from pipecat.services.heygen.api import NewSessionRequest from pipecat.services.heygen.client import HEY_GEN_SAMPLE_RATE, HeyGenCallbacks, HeyGenClient from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue # Using the same values that we do in the BaseOutputTransport @@ -321,7 +320,7 @@ class HeyGenVideoService(AIService): while True: try: - frame = await wait_for(self._queue.get(), timeout=AVATAR_VAD_STOP_SECS) + frame = await asyncio.wait_for(self._queue.get(), timeout=AVATAR_VAD_STOP_SECS) if self._is_interrupting: break if isinstance(frame, TTSAudioRawFrame): diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index fba80f238..ecde66eec 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -39,7 +39,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator from pipecat.utils.tracing.service_decorators import traced_llm @@ -197,7 +196,7 @@ class BaseOpenAILLMService(LLMService): if self._retry_on_timeout: try: - chunks = await wait_for( + chunks = await asyncio.wait_for( self._client.chat.completions.create(**params), timeout=self._retry_timeout_secs ) return chunks diff --git a/src/pipecat/services/riva/tts.py b/src/pipecat/services/riva/tts.py index a9f5f77d8..1889c19dd 100644 --- a/src/pipecat/services/riva/tts.py +++ b/src/pipecat/services/riva/tts.py @@ -14,7 +14,6 @@ import asyncio import os from typing import AsyncGenerator, Mapping, Optional -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.tracing.service_decorators import traced_tts # Suppress gRPC fork warnings @@ -169,7 +168,7 @@ class RivaTTSService(TTSService): await asyncio.to_thread(read_audio_responses, queue) # Wait for the thread to start. - resp = await wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS) + resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS) while resp: await self.stop_ttfb_metrics() frame = TTSAudioRawFrame( @@ -178,7 +177,7 @@ class RivaTTSService(TTSService): num_channels=1, ) yield frame - resp = await wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS) + resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS) except asyncio.TimeoutError: logger.error(f"{self} timeout waiting for audio response") diff --git a/src/pipecat/services/speechmatics/stt.py b/src/pipecat/services/speechmatics/stt.py index 523c2b89d..59145742e 100644 --- a/src/pipecat/services/speechmatics/stt.py +++ b/src/pipecat/services/speechmatics/stt.py @@ -33,7 +33,6 @@ from pipecat.frames.frames import ( from pipecat.processors.frame_processor import FrameDirection from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.tracing.service_decorators import traced_stt try: @@ -577,7 +576,7 @@ class SpeechmaticsSTTService(STTService): # Disconnect the client try: if self._client: - await wait_for(self._client.close(), timeout=1.0) + await asyncio.wait_for(self._client.close(), timeout=1.0) except asyncio.TimeoutError: logger.warning("Timeout while closing Speechmatics client connection") except Exception as e: diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 9e0bb61f6..208abaef9 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -37,7 +37,6 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService from pipecat.services.websocket_service import WebsocketService from pipecat.transcriptions.language import Language -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue from pipecat.utils.text.base_text_aggregator import BaseTextAggregator from pipecat.utils.text.base_text_filter import BaseTextFilter @@ -428,7 +427,7 @@ class TTSService(AIService): has_started = False while True: try: - frame = await wait_for( + frame = await asyncio.wait_for( self._stop_frame_queue.get(), timeout=self._stop_frame_timeout_s ) if isinstance(frame, TTSStartedFrame): @@ -859,7 +858,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService): running = True while running: try: - frame = await wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) + frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT) self.reset_watchdog() if frame: await self.push_frame(frame) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index dfa0e1c2f..6891bef00 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -49,7 +49,6 @@ from pipecat.frames.frames import ( from pipecat.metrics.metrics import MetricsData from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue AUDIO_INPUT_TIMEOUT_SECS = 0.5 @@ -476,7 +475,7 @@ class BaseInputTransport(FrameProcessor): vad_state: VADState = VADState.QUIET while True: try: - frame: InputAudioRawFrame = await wait_for( + frame: InputAudioRawFrame = await asyncio.wait_for( self._audio_in_queue.get(), timeout=AUDIO_INPUT_TIMEOUT_SECS ) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 5975a9be7..afb5abb0b 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -46,7 +46,6 @@ from pipecat.frames.frames import ( ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transports.base_transport import TransportParams -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue from pipecat.utils.time import nanoseconds_to_seconds @@ -624,7 +623,9 @@ class BaseOutputTransport(FrameProcessor): async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: while True: try: - frame = await wait_for(self._audio_queue.get(), timeout=vad_stop_secs) + frame = await asyncio.wait_for( + self._audio_queue.get(), timeout=vad_stop_secs + ) self._transport.reset_watchdog() yield frame except asyncio.TimeoutError: diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index 5d0429982..ddb8339c9 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -40,7 +40,6 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator try: @@ -290,7 +289,7 @@ class SmallWebRTCClient: continue try: - frame = await wait_for(self._video_input_track.recv(), timeout=2.0) + frame = await asyncio.wait_for(self._video_input_track.recv(), timeout=2.0) except asyncio.TimeoutError: if self._webrtc_connection.is_connected(): logger.warning("Timeout: No video frame received within the specified time.") @@ -333,7 +332,7 @@ class SmallWebRTCClient: continue try: - frame = await wait_for(self._audio_input_track.recv(), timeout=2.0) + frame = await asyncio.wait_for(self._audio_input_track.recv(), timeout=2.0) except asyncio.TimeoutError: if self._webrtc_connection.is_connected(): logger.warning("Timeout: No audio frame received within the specified time.") diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 77ac20bc1..77f7f5e29 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -49,7 +49,6 @@ from pipecat.transports.base_input import BaseInputTransport from pipecat.transports.base_output import BaseOutputTransport from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue try: @@ -695,7 +694,7 @@ class DailyTransportClient(EventHandler): }, ) - return await wait_for(future, timeout=10) + return await asyncio.wait_for(future, timeout=10) async def leave(self): """Leave the Daily room and cleanup resources.""" @@ -736,7 +735,7 @@ class DailyTransportClient(EventHandler): """Execute the actual room leave operation.""" future = self._get_event_loop().create_future() self._client.leave(completion=completion_callback(future)) - return await wait_for(future, timeout=10) + return await asyncio.wait_for(future, timeout=10) def _cleanup(self): """Cleanup the Daily client instance.""" diff --git a/src/pipecat/utils/asyncio/task_manager.py b/src/pipecat/utils/asyncio/task_manager.py index 58e337034..3bc3453ac 100644 --- a/src/pipecat/utils/asyncio/task_manager.py +++ b/src/pipecat/utils/asyncio/task_manager.py @@ -20,8 +20,6 @@ from typing import Coroutine, Dict, Optional, Sequence from loguru import logger -from pipecat.utils.asyncio.timeout import wait_for - WATCHDOG_TIMEOUT = 5.0 @@ -287,7 +285,7 @@ class TaskManager(BaseTaskManager): name = task.get_name() try: if timeout: - await wait_for(task, timeout=timeout) + await asyncio.wait_for(task, timeout=timeout) else: await task except asyncio.TimeoutError: @@ -317,7 +315,7 @@ class TaskManager(BaseTaskManager): # Make sure to reset watchdog if a task is cancelled. self.reset_watchdog(task) if timeout: - await wait_for(task, timeout=timeout) + await asyncio.wait_for(task, timeout=timeout) else: await task except asyncio.TimeoutError: @@ -404,7 +402,7 @@ class TaskManager(BaseTaskManager): break start_time = time.time() - await wait_for(timer.wait(), timeout=watchdog_timeout) + await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout) total_time = time.time() - start_time if enable_watchdog_logging: logger.debug(f"{name} time between watchdog timer resets: {total_time:.20f}") diff --git a/src/pipecat/utils/asyncio/timeout.py b/src/pipecat/utils/asyncio/timeout.py deleted file mode 100644 index 65141ffe5..000000000 --- a/src/pipecat/utils/asyncio/timeout.py +++ /dev/null @@ -1,29 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -"""Implementation of Python's `asyncio.wait_for()`. - -This module uses `wait_for2` package to implement `asyncio.wait_for()` for -Python < 3.12. - -In Python 3.12, `asyncio.wait_for()` is implemented in terms of -`asyncio.timeout()` which fixed a bunch of issues. However, this was never -backported (because of the lack of `async.timeout()`) and there are still many -remainig issues, specially in Python 3.10, in `async.wait_for()`. - -See https://github.com/python/cpython/pull/98518 -""" - -import sys - -if sys.version_info >= (3, 12): - import asyncio - - wait_for = asyncio.wait_for -else: - import wait_for2 - - wait_for = wait_for2.wait_for diff --git a/src/pipecat/utils/asyncio/watchdog_async_iterator.py b/src/pipecat/utils/asyncio/watchdog_async_iterator.py index 4594568e0..410584f49 100644 --- a/src/pipecat/utils/asyncio/watchdog_async_iterator.py +++ b/src/pipecat/utils/asyncio/watchdog_async_iterator.py @@ -15,7 +15,6 @@ import asyncio from typing import AsyncIterator, Optional from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for class WatchdogAsyncIterator: @@ -78,7 +77,7 @@ class WatchdogAsyncIterator: if not self._current_anext_task: self._current_anext_task = asyncio.create_task(self._iter.__anext__()) - item = await wait_for( + item = await asyncio.wait_for( asyncio.shield(self._current_anext_task), timeout=self._timeout ) diff --git a/src/pipecat/utils/asyncio/watchdog_coroutine.py b/src/pipecat/utils/asyncio/watchdog_coroutine.py index 7827eb779..99a5f8974 100644 --- a/src/pipecat/utils/asyncio/watchdog_coroutine.py +++ b/src/pipecat/utils/asyncio/watchdog_coroutine.py @@ -14,9 +14,7 @@ watchdog timeouts during legitimate operations. import asyncio from typing import Optional -from pipecat.pipeline import task from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for class WatchdogCoroutine: @@ -60,7 +58,7 @@ class WatchdogCoroutine: if not self._current_coro_task: self._current_coro_task = asyncio.create_task(self._coroutine) - result = await wait_for( + result = await asyncio.wait_for( asyncio.shield(self._current_coro_task), timeout=self._timeout ) diff --git a/src/pipecat/utils/asyncio/watchdog_event.py b/src/pipecat/utils/asyncio/watchdog_event.py index cc5c6c19a..c48356c50 100644 --- a/src/pipecat/utils/asyncio/watchdog_event.py +++ b/src/pipecat/utils/asyncio/watchdog_event.py @@ -14,7 +14,6 @@ watchdog timeouts during legitimate waiting periods. import asyncio from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for class WatchdogEvent(asyncio.Event): @@ -56,7 +55,7 @@ class WatchdogEvent(asyncio.Event): """Wait for event while periodically resetting watchdog timer.""" while True: try: - await wait_for(super().wait(), timeout=self._timeout) + await asyncio.wait_for(super().wait(), timeout=self._timeout) self._manager.task_reset_watchdog() return True except asyncio.TimeoutError: diff --git a/src/pipecat/utils/asyncio/watchdog_priority_queue.py b/src/pipecat/utils/asyncio/watchdog_priority_queue.py index 384af195c..c9b8fe171 100644 --- a/src/pipecat/utils/asyncio/watchdog_priority_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_priority_queue.py @@ -17,7 +17,6 @@ from dataclasses import dataclass from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for @dataclass @@ -125,7 +124,7 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue): """Get item from queue while periodically resetting watchdog timer.""" while True: try: - item = await wait_for(super().get(), timeout=self._timeout) + item = await asyncio.wait_for(super().get(), timeout=self._timeout) self._manager.task_reset_watchdog() return item except asyncio.TimeoutError: diff --git a/src/pipecat/utils/asyncio/watchdog_queue.py b/src/pipecat/utils/asyncio/watchdog_queue.py index 6968a4903..45bcaff66 100644 --- a/src/pipecat/utils/asyncio/watchdog_queue.py +++ b/src/pipecat/utils/asyncio/watchdog_queue.py @@ -17,7 +17,6 @@ from dataclasses import dataclass from loguru import logger from pipecat.utils.asyncio.task_manager import BaseTaskManager -from pipecat.utils.asyncio.timeout import wait_for @dataclass @@ -104,7 +103,7 @@ class WatchdogQueue(asyncio.Queue): """Get item from queue while periodically resetting watchdog timer.""" while True: try: - item = await wait_for(super().get(), timeout=self._timeout) + item = await asyncio.wait_for(super().get(), timeout=self._timeout) self._manager.task_reset_watchdog() return item except asyncio.TimeoutError: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 293cae9f0..4b2c34828 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -24,7 +24,6 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.filters.identity_filter import IdentityFilter from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.tests.utils import HeartbeatsObserver, run_test -from pipecat.utils.asyncio.timeout import wait_for class TestPipeline(unittest.IsolatedAsyncioTestCase): @@ -251,7 +250,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.queue_frame(TextFrame(text="Hello Downstream!")) try: - await wait_for( + await asyncio.wait_for( asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))), timeout=1.0, ) @@ -287,7 +286,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): await task.queue_frame(TextFrame(text="Hello!")) try: - await wait_for( + await asyncio.wait_for( asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))), timeout=1.0, ) @@ -307,7 +306,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): pipeline = Pipeline([identity]) task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False) try: - await wait_for( + await asyncio.wait_for( asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))), timeout=0.3, )