Merge pull request #2480 from pipecat-ai/aleix/replace-asyncio-waitfor

replace asyncio.wait_for for wait_for2.wait_for
This commit is contained in:
Aleix Conchillo Flaqué
2025-08-20 17:43:32 -07:00
committed by GitHub
30 changed files with 63 additions and 101 deletions

View File

@@ -9,9 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `pipecat.utils.timeout.wait_for()` which uses `wait_for2` package to
avoid `asyncio.wait_for()` issues in Python < 3.12.
- Allow passing custom pipeline sink and source processors to a
`Pipeline`. Pipeline source and sink processors are used to know and control
what's coming in and out of a `Pipeline` processor.
@@ -49,9 +46,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Replaced `asyncio.wait_for()` for Pipecat's `wait_for()`. In Python 3.10,
`asyncio.wait_for()` has issues regarding task cancellation (i.e. cancellation
is never propagated).
- Replaced `asyncio.wait_for()` for `wait_for2.wait_for()` for Python <
3.12. because of issues regarding task cancellation (i.e. cancellation is
never propagated).
See https://bugs.python.org/issue42130
- Fixed an `AudioBufferProcessor` issues that would cause audio overlap when

View File

@@ -43,7 +43,6 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.utils.asyncio.timeout import wait_for
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -123,7 +122,7 @@ class EvalRunner:
logger.error(f"ERROR: Unable to run {example_file}: {e}")
try:
result = await wait_for(self._queue.get(), timeout=1.0)
result = await asyncio.wait_for(self._queue.get(), timeout=1.0)
except asyncio.TimeoutError:
result = False

View File

@@ -12,3 +12,20 @@ from loguru import logger
__version__ = version("pipecat-ai")
logger.info(f"ᓚᘏᗢ Pipecat {__version__} (Python {sys.version}) ᓚᘏᗢ")
# We replace `asyncio.wait_for()` for `wait_for2.wait_for()` for Python < 3.12.
#
# In Python 3.12, `asyncio.wait_for()` is implemented in terms of
# `asyncio.timeout()` which fixed a bunch of issues. However, this was never
# backported (because of the lack of `async.timeout()`) and there are still many
# remainig issues, specially in Python 3.10, in `async.wait_for()`.
#
# See https://github.com/python/cpython/pull/98518
import asyncio
if sys.version_info < (3, 12):
import wait_for2
# Replace asyncio.wait_for.
asyncio.wait_for = wait_for2.wait_for

View File

@@ -51,7 +51,6 @@ from pipecat.utils.asyncio.task_manager import (
TaskManager,
TaskManagerParams,
)
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.tracing.setup import is_tracing_available
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -655,7 +654,7 @@ class PipelineTask(BasePipelineTask):
wait_time = HEARTBEAT_MONITOR_SECONDS
while True:
try:
frame = await wait_for(self._heartbeat_queue.get(), timeout=wait_time)
frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time)
process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000
logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds")
self._heartbeat_queue.task_done()
@@ -678,7 +677,9 @@ class PipelineTask(BasePipelineTask):
while running:
try:
frame = await wait_for(self._idle_queue.get(), timeout=self._idle_timeout_secs)
frame = await asyncio.wait_for(
self._idle_queue.get(), timeout=self._idle_timeout_secs
)
if not isinstance(frame, InputAudioRawFrame):
frame_buffer.append(frame)

View File

@@ -25,7 +25,6 @@ from pipecat.frames.frames import (
TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
from pipecat.utils.time import time_now_iso8601
@@ -135,7 +134,7 @@ class DTMFAggregator(FrameProcessor):
"""Background task that handles timeout-based flushing."""
while True:
try:
await wait_for(self._digit_event.wait(), timeout=self._idle_timeout)
await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout)
self._digit_event.clear()
except asyncio.TimeoutError:
self.reset_watchdog()

View File

@@ -60,7 +60,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.time import time_now_iso8601
@@ -671,7 +670,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
if self._vad_params
else self._params.turn_emulated_vad_timeout
)
await wait_for(self._aggregation_event.wait(), timeout=timeout)
await asyncio.wait_for(self._aggregation_event.wait(), timeout=timeout)
await self._maybe_emulate_user_speaking()
except asyncio.TimeoutError:
if not self._user_speaking:

View File

@@ -11,7 +11,6 @@ from typing import Awaitable, Callable, List, Optional
from pipecat.frames.frames import Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
@@ -86,7 +85,7 @@ class IdleFrameProcessor(FrameProcessor):
"""Handle idle timeout monitoring and callback execution."""
while True:
try:
await wait_for(self._idle_event.wait(), timeout=self._timeout)
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
await self._callback(self)
finally:

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
@@ -192,7 +191,7 @@ class UserIdleProcessor(FrameProcessor):
"""
while True:
try:
await wait_for(self._idle_event.wait(), timeout=self._timeout)
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
self._retry_count += 1

View File

@@ -53,7 +53,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -186,7 +185,9 @@ class AnthropicLLMService(LLMService):
"""
if self._retry_on_timeout:
try:
response = await wait_for(api_call(**params), timeout=self._retry_timeout_secs)
response = await asyncio.wait_for(
api_call(**params), timeout=self._retry_timeout_secs
)
return response
except (APITimeoutError, asyncio.TimeoutError):
# Retry, this time without a timeout so we get a response

View File

@@ -31,7 +31,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -220,7 +219,7 @@ class AssemblyAISTTService(STTService):
await self._websocket.send(json.dumps({"type": "Terminate"}))
try:
await wait_for(self._termination_event.wait(), timeout=5.0)
await asyncio.wait_for(self._termination_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Timed out waiting for termination message from server")
@@ -245,7 +244,7 @@ class AssemblyAISTTService(STTService):
try:
while self._connected:
try:
message = await wait_for(self._websocket.recv(), timeout=1.0)
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
data = json.loads(message)
await self._handle_message(data)
except asyncio.TimeoutError:

View File

@@ -52,7 +52,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.tracing.service_decorators import traced_llm
try:
@@ -802,7 +801,7 @@ class AWSBedrockLLMService(LLMService):
"""
if self._retry_on_timeout:
try:
response = await wait_for(
response = await asyncio.wait_for(
await client.converse_stream(**request_params), timeout=self._retry_timeout_secs
)
return response

View File

@@ -31,7 +31,6 @@ from pipecat.frames.frames import (
from pipecat.services.aws.utils import build_event_message, decode_event, get_presigned_url
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
@@ -481,7 +480,7 @@ class AWSTranscribeSTTService(STTService):
break
try:
response = await wait_for(self._ws_client.recv(), timeout=1.0)
response = await asyncio.wait_for(self._ws_client.recv(), timeout=1.0)
headers, payload = decode_event(response)

View File

@@ -31,7 +31,6 @@ from pipecat.processors.frame_processor import FrameProcessorSetup
from pipecat.services.heygen.api import HeyGenApi, HeyGenSession, NewSessionRequest
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
@@ -232,7 +231,7 @@ class HeyGenClient:
"""Handle incoming WebSocket messages."""
while self._connected:
try:
message = await wait_for(self._websocket.recv(), timeout=1.0)
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
parsed_message = json.loads(message)
await self._handle_ws_server_event(parsed_message)
except asyncio.TimeoutError:

View File

@@ -40,7 +40,6 @@ from pipecat.services.ai_service import AIService
from pipecat.services.heygen.api import NewSessionRequest
from pipecat.services.heygen.client import HEY_GEN_SAMPLE_RATE, HeyGenCallbacks, HeyGenClient
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
# Using the same values that we do in the BaseOutputTransport
@@ -321,7 +320,7 @@ class HeyGenVideoService(AIService):
while True:
try:
frame = await wait_for(self._queue.get(), timeout=AVATAR_VAD_STOP_SECS)
frame = await asyncio.wait_for(self._queue.get(), timeout=AVATAR_VAD_STOP_SECS)
if self._is_interrupting:
break
if isinstance(frame, TTSAudioRawFrame):

View File

@@ -39,7 +39,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -197,7 +196,7 @@ class BaseOpenAILLMService(LLMService):
if self._retry_on_timeout:
try:
chunks = await wait_for(
chunks = await asyncio.wait_for(
self._client.chat.completions.create(**params), timeout=self._retry_timeout_secs
)
return chunks

View File

@@ -14,7 +14,6 @@ import asyncio
import os
from typing import AsyncGenerator, Mapping, Optional
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.tracing.service_decorators import traced_tts
# Suppress gRPC fork warnings
@@ -169,7 +168,7 @@ class RivaTTSService(TTSService):
await asyncio.to_thread(read_audio_responses, queue)
# Wait for the thread to start.
resp = await wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
while resp:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
@@ -178,7 +177,7 @@ class RivaTTSService(TTSService):
num_channels=1,
)
yield frame
resp = await wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")

View File

@@ -33,7 +33,6 @@ from pipecat.frames.frames import (
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.tracing.service_decorators import traced_stt
try:
@@ -577,7 +576,7 @@ class SpeechmaticsSTTService(STTService):
# Disconnect the client
try:
if self._client:
await wait_for(self._client.close(), timeout=1.0)
await asyncio.wait_for(self._client.close(), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("Timeout while closing Speechmatics client connection")
except Exception as e:

View File

@@ -37,7 +37,6 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.base_text_filter import BaseTextFilter
@@ -428,7 +427,7 @@ class TTSService(AIService):
has_started = False
while True:
try:
frame = await wait_for(
frame = await asyncio.wait_for(
self._stop_frame_queue.get(), timeout=self._stop_frame_timeout_s
)
if isinstance(frame, TTSStartedFrame):
@@ -859,7 +858,7 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
running = True
while running:
try:
frame = await wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT)
frame = await asyncio.wait_for(queue.get(), timeout=AUDIO_CONTEXT_TIMEOUT)
self.reset_watchdog()
if frame:
await self.push_frame(frame)

View File

@@ -49,7 +49,6 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
AUDIO_INPUT_TIMEOUT_SECS = 0.5
@@ -476,7 +475,7 @@ class BaseInputTransport(FrameProcessor):
vad_state: VADState = VADState.QUIET
while True:
try:
frame: InputAudioRawFrame = await wait_for(
frame: InputAudioRawFrame = await asyncio.wait_for(
self._audio_in_queue.get(), timeout=AUDIO_INPUT_TIMEOUT_SECS
)

View File

@@ -46,7 +46,6 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_priority_queue import WatchdogPriorityQueue
from pipecat.utils.time import nanoseconds_to_seconds
@@ -624,7 +623,9 @@ class BaseOutputTransport(FrameProcessor):
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
frame = await wait_for(self._audio_queue.get(), timeout=vad_stop_secs)
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
self._transport.reset_watchdog()
yield frame
except asyncio.TimeoutError:

View File

@@ -40,7 +40,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
try:
@@ -290,7 +289,7 @@ class SmallWebRTCClient:
continue
try:
frame = await wait_for(self._video_input_track.recv(), timeout=2.0)
frame = await asyncio.wait_for(self._video_input_track.recv(), timeout=2.0)
except asyncio.TimeoutError:
if self._webrtc_connection.is_connected():
logger.warning("Timeout: No video frame received within the specified time.")
@@ -333,7 +332,7 @@ class SmallWebRTCClient:
continue
try:
frame = await wait_for(self._audio_input_track.recv(), timeout=2.0)
frame = await asyncio.wait_for(self._audio_input_track.recv(), timeout=2.0)
except asyncio.TimeoutError:
if self._webrtc_connection.is_connected():
logger.warning("Timeout: No audio frame received within the specified time.")

View File

@@ -49,7 +49,6 @@ from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
try:
@@ -695,7 +694,7 @@ class DailyTransportClient(EventHandler):
},
)
return await wait_for(future, timeout=10)
return await asyncio.wait_for(future, timeout=10)
async def leave(self):
"""Leave the Daily room and cleanup resources."""
@@ -736,7 +735,7 @@ class DailyTransportClient(EventHandler):
"""Execute the actual room leave operation."""
future = self._get_event_loop().create_future()
self._client.leave(completion=completion_callback(future))
return await wait_for(future, timeout=10)
return await asyncio.wait_for(future, timeout=10)
def _cleanup(self):
"""Cleanup the Daily client instance."""

View File

@@ -20,8 +20,6 @@ from typing import Coroutine, Dict, Optional, Sequence
from loguru import logger
from pipecat.utils.asyncio.timeout import wait_for
WATCHDOG_TIMEOUT = 5.0
@@ -287,7 +285,7 @@ class TaskManager(BaseTaskManager):
name = task.get_name()
try:
if timeout:
await wait_for(task, timeout=timeout)
await asyncio.wait_for(task, timeout=timeout)
else:
await task
except asyncio.TimeoutError:
@@ -317,7 +315,7 @@ class TaskManager(BaseTaskManager):
# Make sure to reset watchdog if a task is cancelled.
self.reset_watchdog(task)
if timeout:
await wait_for(task, timeout=timeout)
await asyncio.wait_for(task, timeout=timeout)
else:
await task
except asyncio.TimeoutError:
@@ -404,7 +402,7 @@ class TaskManager(BaseTaskManager):
break
start_time = time.time()
await wait_for(timer.wait(), timeout=watchdog_timeout)
await asyncio.wait_for(timer.wait(), timeout=watchdog_timeout)
total_time = time.time() - start_time
if enable_watchdog_logging:
logger.debug(f"{name} time between watchdog timer resets: {total_time:.20f}")

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Implementation of Python's `asyncio.wait_for()`.
This module uses `wait_for2` package to implement `asyncio.wait_for()` for
Python < 3.12.
In Python 3.12, `asyncio.wait_for()` is implemented in terms of
`asyncio.timeout()` which fixed a bunch of issues. However, this was never
backported (because of the lack of `async.timeout()`) and there are still many
remainig issues, specially in Python 3.10, in `async.wait_for()`.
See https://github.com/python/cpython/pull/98518
"""
import sys
if sys.version_info >= (3, 12):
import asyncio
wait_for = asyncio.wait_for
else:
import wait_for2
wait_for = wait_for2.wait_for

View File

@@ -15,7 +15,6 @@ import asyncio
from typing import AsyncIterator, Optional
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
class WatchdogAsyncIterator:
@@ -78,7 +77,7 @@ class WatchdogAsyncIterator:
if not self._current_anext_task:
self._current_anext_task = asyncio.create_task(self._iter.__anext__())
item = await wait_for(
item = await asyncio.wait_for(
asyncio.shield(self._current_anext_task), timeout=self._timeout
)

View File

@@ -14,9 +14,7 @@ watchdog timeouts during legitimate operations.
import asyncio
from typing import Optional
from pipecat.pipeline import task
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
class WatchdogCoroutine:
@@ -60,7 +58,7 @@ class WatchdogCoroutine:
if not self._current_coro_task:
self._current_coro_task = asyncio.create_task(self._coroutine)
result = await wait_for(
result = await asyncio.wait_for(
asyncio.shield(self._current_coro_task), timeout=self._timeout
)

View File

@@ -14,7 +14,6 @@ watchdog timeouts during legitimate waiting periods.
import asyncio
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
class WatchdogEvent(asyncio.Event):
@@ -56,7 +55,7 @@ class WatchdogEvent(asyncio.Event):
"""Wait for event while periodically resetting watchdog timer."""
while True:
try:
await wait_for(super().wait(), timeout=self._timeout)
await asyncio.wait_for(super().wait(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return True
except asyncio.TimeoutError:

View File

@@ -17,7 +17,6 @@ from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
@dataclass
@@ -125,7 +124,7 @@ class WatchdogPriorityQueue(asyncio.PriorityQueue):
"""Get item from queue while periodically resetting watchdog timer."""
while True:
try:
item = await wait_for(super().get(), timeout=self._timeout)
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:

View File

@@ -17,7 +17,6 @@ from dataclasses import dataclass
from loguru import logger
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.asyncio.timeout import wait_for
@dataclass
@@ -104,7 +103,7 @@ class WatchdogQueue(asyncio.Queue):
"""Get item from queue while periodically resetting watchdog timer."""
while True:
try:
item = await wait_for(super().get(), timeout=self._timeout)
item = await asyncio.wait_for(super().get(), timeout=self._timeout)
self._manager.task_reset_watchdog()
return item
except asyncio.TimeoutError:

View File

@@ -24,7 +24,6 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import HeartbeatsObserver, run_test
from pipecat.utils.asyncio.timeout import wait_for
class TestPipeline(unittest.IsolatedAsyncioTestCase):
@@ -251,7 +250,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello Downstream!"))
try:
await wait_for(
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=1.0,
)
@@ -287,7 +286,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
await task.queue_frame(TextFrame(text="Hello!"))
try:
await wait_for(
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=1.0,
)
@@ -307,7 +306,7 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
pipeline = Pipeline([identity])
task = PipelineTask(pipeline, idle_timeout_secs=0.2, cancel_on_idle_timeout=False)
try:
await wait_for(
await asyncio.wait_for(
asyncio.shield(task.run(PipelineTaskParams(loop=asyncio.get_event_loop()))),
timeout=0.3,
)