diff --git a/src/pipecat/transports/vonage/client.py b/src/pipecat/transports/vonage/client.py index ecdfd24bc..a53fa1b4c 100644 --- a/src/pipecat/transports/vonage/client.py +++ b/src/pipecat/transports/vonage/client.py @@ -8,12 +8,12 @@ import asyncio import itertools import threading -from collections.abc import Coroutine +from collections.abc import Awaitable, Callable, Coroutine from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, replace from datetime import datetime, timedelta from enum import StrEnum -from typing import Any, Awaitable, Callable, Optional, TypeVar +from typing import Any, Optional, TypeVar import numpy as np from loguru import logger @@ -96,8 +96,8 @@ class VonageVideoConnectorTransportParams(TransportParams): audio_in_auto_subscribe: bool = True video_in_auto_subscribe: bool = False video_connector_log_level: str = "INFO" - video_in_preferred_resolution: Optional[tuple[int, int]] = None - video_in_preferred_framerate: Optional[int] = None + video_in_preferred_resolution: tuple[int, int] | None = None + video_in_preferred_framerate: int | None = None clear_buffers_on_interruption: bool = True @@ -114,8 +114,8 @@ class SubscribeSettings: subscribe_to_audio: bool = True subscribe_to_video: bool = False - preferred_resolution: Optional[tuple[int, int]] = None - preferred_framerate: Optional[int] = None + preferred_resolution: tuple[int, int] | None = None + preferred_framerate: int | None = None class VonageException(Exception): @@ -211,7 +211,7 @@ SimpleCoroutine = Coroutine[Any, Any, None] DUMMY_CONNECTION = Connection(id="", creation_time=datetime.min) -def _to_enum(value: Optional[str], enum_cls: type[TE]) -> Optional[TE]: +def _to_enum(value: str | None, enum_cls: type[TE]) -> TE | None: """Convert a string value to the specified StrEnum type, returning None if invalid.""" try: return enum_cls(value or "") @@ -259,25 +259,25 @@ class VonageClient: self._connected: bool = False self._connection_counter: int = 0 - self._connecting_future: Optional[asyncio.Future[None]] = None - self._disconnecting_future: Optional[asyncio.Future[None]] = None + self._connecting_future: asyncio.Future[None] | None = None + self._disconnecting_future: asyncio.Future[None] | None = None self._listener_id_gen: itertools.count[int] = itertools.count() self._listeners: dict[int, VonageClientListener] = {} - self._publisher: Optional[Publisher] = None + self._publisher: Publisher | None = None self._session = Session(id=session_id) self._resampler = create_stream_resampler() - self._task_manager: Optional[BaseTaskManager] = None + self._task_manager: BaseTaskManager | None = None self._loop_thread_id = threading.get_ident() - self._event_queue: Optional[asyncio.Queue[SimpleCoroutine]] = None - self._event_task: Optional[asyncio.Task[None]] = None - self._audio_queue: Optional[asyncio.Queue[SimpleCoroutine]] = None - self._audio_task: Optional[asyncio.Task[None]] = None - self._video_queue: Optional[asyncio.Queue[SimpleCoroutine]] = None - self._video_task: Optional[asyncio.Task[None]] = None + self._event_queue: asyncio.Queue[SimpleCoroutine] | None = None + self._event_task: asyncio.Task[None] | None = None + self._audio_queue: asyncio.Queue[SimpleCoroutine] | None = None + self._audio_task: asyncio.Task[None] | None = None + self._video_queue: asyncio.Queue[SimpleCoroutine] | None = None + self._video_task: asyncio.Task[None] | None = None # used for blocking calls to connect and disconnect self._executor = ThreadPoolExecutor(max_workers=1) @@ -366,7 +366,7 @@ class VonageClient: """ self._listeners.pop(listener_id, None) - async def connect(self, frame: Optional[StartFrame] = None) -> None: + async def connect(self, frame: StartFrame | None = None) -> None: """Connect to the Vonage session. Args: @@ -698,7 +698,7 @@ class VonageClient: try: await asyncio.wait_for(async_proc(), timeout=VIDEO_CONNECTOR_TIMEOUT.total_seconds()) - except asyncio.TimeoutError as exc: + except TimeoutError as exc: logger.error(f"Timeout connecting to Vonage session {self._session_id}") raise exc @@ -715,7 +715,7 @@ class VonageClient: self._get_event_loop().run_in_executor(self._executor, disconnect_proc), timeout=VIDEO_CONNECTOR_TIMEOUT.total_seconds(), ) - except asyncio.TimeoutError: + except TimeoutError: logger.error(f"Timeout disconnecting from Vonage session {self._session_id}") raise @@ -800,7 +800,7 @@ class VonageClient: try: await asyncio.wait_for(process(), timeout=VIDEO_CONNECTOR_TIMEOUT.total_seconds()) - except asyncio.TimeoutError: + except TimeoutError: logger.error(f"Timeout subscribing to Vonage stream {stream.id}") self._session_subscriptions.pop(stream.id, None) raise @@ -856,7 +856,7 @@ class VonageClient: self._loop_thread_id = threading.get_ident() # if we allow concurrent tasks, process them as they come in if allow_concurrent: - active_tasks = set() + active_tasks: set[asyncio.Task[Any]] = set() async def wrapped_task(coroutine: SimpleCoroutine) -> None: try: @@ -907,7 +907,7 @@ class VonageClient: def _sdk_cb_to_loop( self, queue_type_name: str, - queue: Optional[asyncio.Queue[SimpleCoroutine]], + queue: asyncio.Queue[SimpleCoroutine] | None, async_task: SimpleCoroutine, ) -> None: """From an SDK thread queue a coroutine to be asynchronously executed in the task manager event loop. diff --git a/src/pipecat/transports/vonage/video_connector.py b/src/pipecat/transports/vonage/video_connector.py index f84fc3406..cca211c04 100644 --- a/src/pipecat/transports/vonage/video_connector.py +++ b/src/pipecat/transports/vonage/video_connector.py @@ -360,8 +360,8 @@ class VonageVideoConnectorTransport(BaseTransport): ) ) - self._input: Optional[VonageVideoConnectorInputTransport] = None - self._output: Optional[VonageVideoConnectorOutputTransport] = None + self._input: VonageVideoConnectorInputTransport | None = None + self._output: VonageVideoConnectorOutputTransport | None = None self._one_stream_received: bool = False def input(self) -> FrameProcessor: diff --git a/tests/test_vonage_video_connector.py b/tests/test_vonage_video_connector.py index d13575f36..f8d874fa9 100644 --- a/tests/test_vonage_video_connector.py +++ b/tests/test_vonage_video_connector.py @@ -8,10 +8,11 @@ import asyncio import inspect import sys import threading +from collections.abc import Awaitable, Callable from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Any, Awaitable, Callable, Optional +from typing import Any, Optional from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch import numpy as np @@ -76,7 +77,7 @@ class MockPublisher: @dataclass(eq=True, frozen=True) class MockSubscriber: - stream: Optional[MockStream] = None + stream: MockStream | None = None @dataclass(eq=True, frozen=True) @@ -100,9 +101,9 @@ class MockSessionVideoPublisherSettings: @dataclass(eq=True, frozen=True) class MockSessionAVSettings: - audio_publisher: Optional[MockSessionAudioSettings] = None - audio_subscribers_mix: Optional[MockSessionAudioSettings] = None - video_publisher: Optional[MockSessionVideoPublisherSettings] = None + audio_publisher: MockSessionAudioSettings | None = None + audio_subscribers_mix: MockSessionAudioSettings | None = None + video_publisher: MockSessionVideoPublisherSettings | None = None @dataclass(eq=True, frozen=True) @@ -113,8 +114,8 @@ class MockLoggingSettings: @dataclass(eq=True, frozen=True) class MockSessionSettings: enable_migration: bool = False - av: Optional[MockSessionAVSettings] = None - logging: Optional[MockLoggingSettings] = None + av: MockSessionAVSettings | None = None + logging: MockLoggingSettings | None = None @dataclass(eq=True, frozen=True) @@ -128,7 +129,7 @@ class MockPublisherSettings: name: str has_audio: bool has_video: bool - audio_settings: Optional[MockPublisherAudioSettings] = None + audio_settings: MockPublisherAudioSettings | None = None @dataclass(eq=True, frozen=True) @@ -140,15 +141,15 @@ class MockVideoFrame: @dataclass(eq=True, frozen=True) class MockSubscriberVideoSettings: - preferred_resolution: Optional[MockVideoResolution] = None - preferred_framerate: Optional[int] = None + preferred_resolution: MockVideoResolution | None = None + preferred_framerate: int | None = None @dataclass(eq=True, frozen=True) class MockSubscriberSettings: subscribe_to_audio: bool = True subscribe_to_video: bool = True - video_settings: Optional[MockSubscriberVideoSettings] = None + video_settings: MockSubscriberVideoSettings | None = None # Set up the mock module structure @@ -232,11 +233,11 @@ class TestVonageVideoConnectorTransport: self.application_id = "test-app-id" self.session_id = "test-session-id" self.token = "test-token" - self._frame_processor_setup: Optional[FrameProcessorSetup] = None + self._frame_processor_setup: FrameProcessorSetup | None = None self._executor = ThreadPoolExecutor(max_workers=1) # subscriber state - self._connect_callbacks: Optional[ConnectCallbacks] = None + self._connect_callbacks: ConnectCallbacks | None = None self._subscriber_callbacks: dict[str, SubscriberCallbacks] = {} def _get_frame_processor_setup(self) -> FrameProcessorSetup: @@ -271,7 +272,7 @@ class TestVonageVideoConnectorTransport: while not condition(): if asyncio.get_event_loop().time() - start_time > timeout_seconds: - raise asyncio.TimeoutError(f"Condition not met within {timeout}") + raise TimeoutError(f"Condition not met within {timeout}") await asyncio.sleep(check_interval_seconds) def test_vonage_client_listener_defaults(self) -> None: @@ -399,7 +400,7 @@ class TestVonageVideoConnectorTransport: async def _create_client( self, - params: Optional[VonageVideoConnectorTransportParams] = None, + params: VonageVideoConnectorTransportParams | None = None, setup_connect_mock: bool = True, ) -> VonageClient: params = params or VonageVideoConnectorTransportParams() @@ -788,7 +789,7 @@ class TestVonageVideoConnectorTransport: ) def connect_side_effect( - *_: Any, on_ready_for_audio_cb: Optional[Callable[[Any], None]] = None, **__: Any + *_: Any, on_ready_for_audio_cb: Callable[[Any], None] | None = None, **__: Any ) -> bool: assert on_ready_for_audio_cb is not None connecting_future.set_result(on_ready_for_audio_cb)