Files
pipecat/tests/test_vonage_video_connector.py
2026-05-18 14:40:56 +02:00

3105 lines
123 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import inspect
import sys
import threading
from collections.abc import Awaitable, Callable
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
import numpy as np
import pytest
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
InputAudioRawFrame,
InterruptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
StartFrame,
UserImageRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
# Mock the vonage_video module since it's not available in test environment
vonage_video_mock = MagicMock()
vonage_video_mock.VonageVideoClient = MagicMock()
vonage_video_mock.models = MagicMock()
# Create mock classes that match the expected interface
@dataclass(eq=True, frozen=True)
class MockAudioData:
sample_buffer: memoryview
sample_rate: int
number_of_channels: int
number_of_frames: int
@dataclass(eq=True, frozen=True)
class MockSession:
id: str
@dataclass(eq=True, frozen=True)
class MockConnection:
id: str
creation_time: datetime
data: str = ""
DUMMY_CONNECTION = MockConnection(id="dummy", creation_time=datetime.min)
@dataclass(eq=True, frozen=True)
class MockStream:
id: str
connection: MockConnection
@dataclass(eq=True, frozen=True)
class MockPublisher:
stream: MockStream
@dataclass(eq=True, frozen=True)
class MockSubscriber:
stream: MockStream | None = None
@dataclass(eq=True, frozen=True)
class MockSessionAudioSettings:
sample_rate: int = 48000
number_of_channels: int = 2
@dataclass(eq=True, frozen=True)
class MockVideoResolution:
width: int = 640
height: int = 480
@dataclass(eq=True, frozen=True)
class MockSessionVideoPublisherSettings:
resolution: MockVideoResolution
fps: int = 30
format: str = "YUV420P"
@dataclass(eq=True, frozen=True)
class MockSessionAVSettings:
audio_publisher: MockSessionAudioSettings | None = None
audio_subscribers_mix: MockSessionAudioSettings | None = None
video_publisher: MockSessionVideoPublisherSettings | None = None
@dataclass(eq=True, frozen=True)
class MockLoggingSettings:
level: str = "WARN"
@dataclass(eq=True, frozen=True)
class MockSessionSettings:
enable_migration: bool = False
av: MockSessionAVSettings | None = None
logging: MockLoggingSettings | None = None
@dataclass(eq=True, frozen=True)
class MockPublisherAudioSettings:
enable_stereo_mode: bool = True
enable_opus_dtx: bool = False
@dataclass(eq=True, frozen=True)
class MockPublisherSettings:
name: str
has_audio: bool
has_video: bool
audio_settings: MockPublisherAudioSettings | None = None
@dataclass(eq=True, frozen=True)
class MockVideoFrame:
frame_buffer: memoryview
resolution: MockVideoResolution
format: str = "YUV420P"
@dataclass(eq=True, frozen=True)
class MockSubscriberVideoSettings:
preferred_resolution: MockVideoResolution | None = None
preferred_framerate: int | None = None
@dataclass(eq=True, frozen=True)
class MockSubscriberSettings:
subscribe_to_audio: bool = True
subscribe_to_video: bool = True
video_settings: MockSubscriberVideoSettings | None = None
# Set up the mock module structure
vonage_video_mock.models.AudioData = MockAudioData
vonage_video_mock.models.Session = MockSession
vonage_video_mock.models.Connection = MockConnection
vonage_video_mock.models.Stream = MockStream
vonage_video_mock.models.Publisher = MockPublisher
vonage_video_mock.models.Subscriber = MockSubscriber
vonage_video_mock.models.LoggingSettings = MockLoggingSettings
vonage_video_mock.models.SessionAVSettings = MockSessionAVSettings
vonage_video_mock.models.SessionSettings = MockSessionSettings
vonage_video_mock.models.SessionAudioSettings = MockSessionAudioSettings
vonage_video_mock.models.PublisherAudioSettings = MockPublisherAudioSettings
vonage_video_mock.models.PublisherSettings = MockPublisherSettings
vonage_video_mock.models.SessionVideoPublisherSettings = MockSessionVideoPublisherSettings
vonage_video_mock.models.SubscriberSettings = MockSubscriberSettings
vonage_video_mock.models.SubscriberVideoSettings = MockSubscriberVideoSettings
vonage_video_mock.models.VideoResolution = MockVideoResolution
vonage_video_mock.models.VideoFrame = MockVideoFrame
# Mock the module in sys.modules so imports work
sys.modules["vonage_video_connector"] = vonage_video_mock
sys.modules["vonage_video_connector.models"] = vonage_video_mock.models
# Now we can import the transport classes since the vonage_video module is mocked
from pipecat.transports.vonage.client import (
VonageClient,
VonageClientListener,
)
from pipecat.transports.vonage.utils import (
AudioProps,
ImageFormat,
check_audio_data,
image_colorspace_conversion,
process_audio,
process_audio_channels,
)
from pipecat.transports.vonage.video_connector import (
SubscribeSettings,
VonageException,
VonageVideoConnectorInputTransport,
VonageVideoConnectorOutputTransport,
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
@dataclass(frozen=True)
class SubscriberCallbacks:
on_error_cb: Callable[[MockSubscriber, str, int], None]
on_connected_cb: Callable[[MockSubscriber], None]
on_disconnected_cb: Callable[[MockSubscriber], None]
on_render_frame_cb: Callable[[MockSubscriber, MockVideoFrame], None]
@dataclass(frozen=True)
class ConnectCallbacks:
on_error_cb: Callable[[MockSession, str, int], None]
on_disconnected_cb: Callable[[MockSession], None]
on_ready_for_audio_cb: Callable[[MockSession], None]
class TestVonageVideoConnectorTransport:
"""Test cases for Vonage Video Connector transport classes."""
def setup_method(self) -> None:
"""Set up test fixtures."""
self.VonageClient = VonageClient
self.VonageClientListener = VonageClientListener
self.VonageVideoConnectorInputTransport = VonageVideoConnectorInputTransport
self.VonageVideoConnectorOutputTransport = VonageVideoConnectorOutputTransport
self.VonageVideoConnectorTransport = VonageVideoConnectorTransport
self.VonageVideoConnectorTransportParams = VonageVideoConnectorTransportParams
# Mock client instance
self.mock_client_instance = Mock()
vonage_video_mock.VonageVideoClient.return_value = self.mock_client_instance
# Common test data
self.application_id = "test-app-id"
self.session_id = "test-session-id"
self.token = "test-token"
self._frame_processor_setup: FrameProcessorSetup | None = None
self._executor = ThreadPoolExecutor(max_workers=1)
# subscriber state
self._connect_callbacks: ConnectCallbacks | None = None
self._subscriber_callbacks: dict[str, SubscriberCallbacks] = {}
def _get_frame_processor_setup(self) -> FrameProcessorSetup:
if self._frame_processor_setup is not None:
return self._frame_processor_setup
clock: SystemClock = SystemClock() # type: ignore[no-untyped-call]
task_manager = TaskManager()
task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
self._frame_processor_setup = FrameProcessorSetup(clock=clock, task_manager=task_manager)
return self._frame_processor_setup
async def _wait_for_condition(
self,
condition: Callable[[], bool],
timeout: timedelta = timedelta(seconds=1),
check_interval: timedelta = timedelta(milliseconds=10),
) -> None:
"""Wait for a condition to become true with timeout.
Args:
condition: Callable that returns True when condition is met.
timeout: Maximum time to wait.
check_interval: How often to check the condition.
Raises:
asyncio.TimeoutError: If condition is not met within timeout.
"""
start_time = asyncio.get_event_loop().time()
timeout_seconds = timeout.total_seconds()
check_interval_seconds = check_interval.total_seconds()
while not condition():
if asyncio.get_event_loop().time() - start_time > timeout_seconds:
raise TimeoutError(f"Condition not met within {timeout}")
await asyncio.sleep(check_interval_seconds)
def test_vonage_client_listener_defaults(self) -> None:
"""Test VonageClientListener default values."""
listener = self.VonageClientListener()
assert listener.on_connected is not None
assert listener.on_disconnected is not None
assert listener.on_error is not None
assert listener.on_audio_in is not None
assert listener.on_stream_received is not None
assert listener.on_stream_dropped is not None
assert listener.on_subscriber_connected is not None
assert listener.on_subscriber_disconnected is not None
def test_vonage_transport_params_defaults(self) -> None:
"""Test VonageVideoConnectorTransportParams default values."""
params = self.VonageVideoConnectorTransportParams()
assert params.publisher_name == "Bot"
assert params.publisher_enable_opus_dtx is False
assert params.session_enable_migration is False
def test_vonage_client_initialization(self) -> None:
"""Test VonageClient initialization."""
# Reset the mock for this specific test
vonage_video_mock.VonageVideoClient.reset_mock()
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
assert client._application_id == self.application_id
assert client._session_id == self.session_id
assert client._token == self.token
assert client._params == params
assert client._connected is False
assert client._connection_counter == 0
vonage_video_mock.VonageVideoClient.assert_called_once()
# check getting the event loop before setup raises error
with pytest.raises(Exception) as exc_info:
_ = client._get_event_loop()
assert "missing task manager" in str(exc_info.value)
# check pushing events before setup raises error
async def mock_coro() -> None:
pass
mock_task = mock_coro()
with pytest.raises(Exception) as exc_info:
client._sdk_event_cb_to_loop(mock_task)
mock_task.close()
assert "missing event queue" in str(exc_info.value)
def test_vonage_client_add_remove_listener(self) -> None:
"""Test adding and removing listeners from VonageClient."""
params = self.VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
listener = self.VonageClientListener()
listener_id = client.add_listener(listener)
assert isinstance(listener_id, int)
assert listener_id in client._listeners
assert client._listeners[listener_id] == listener
client.remove_listener(listener_id)
assert listener_id not in client._listeners
def _setup_audio_ready_callback(self, client: VonageClient, call_ready_for_audio: bool) -> None:
"""Helper to set up the audio ready callback."""
def connect_side_effect(
*_: Any,
on_error_cb: Callable[[MockSession, str, int], None],
on_disconnected_cb: Callable[[MockSession], None],
on_ready_for_audio_cb: Callable[[MockSession], None],
**__: Any,
) -> bool:
if call_ready_for_audio:
on_ready_for_audio_cb(vonage_video_mock.models.Session(id="session"))
self._connect_callbacks = ConnectCallbacks(
on_error_cb=on_error_cb,
on_disconnected_cb=on_disconnected_cb,
on_ready_for_audio_cb=on_ready_for_audio_cb,
)
return True
self.mock_client_instance.connect = MagicMock(side_effect=connect_side_effect)
def _setup_subscriber_callbacks(self, client: VonageClient) -> None:
def subscribe_side_effect(
stream: MockStream,
on_error_cb: Callable[[MockSubscriber, str, int], None],
on_connected_cb: Callable[[MockSubscriber], None],
on_disconnected_cb: Callable[[MockSubscriber], None],
on_render_frame_cb: Callable[[MockSubscriber, MockVideoFrame], None],
**__: Any,
) -> bool:
self._subscriber_callbacks[stream.id] = SubscriberCallbacks(
on_error_cb=on_error_cb,
on_connected_cb=on_connected_cb,
on_disconnected_cb=on_disconnected_cb,
on_render_frame_cb=on_render_frame_cb,
)
return True
self.mock_client_instance.subscribe = MagicMock(side_effect=subscribe_side_effect)
async def _subscribe_n_handle_callbacks(
self,
client: VonageClient,
stream_id: str,
params: SubscribeSettings,
callback: Callable[[SubscriberCallbacks], None],
) -> None:
task = asyncio.create_task(client.subscribe_to_stream(stream_id, params))
await self._wait_for_condition(
lambda: stream_id in self._subscriber_callbacks,
timeout=timedelta(seconds=2),
)
callback(self._subscriber_callbacks[stream_id])
await task
async def _create_client(
self,
params: VonageVideoConnectorTransportParams | None = None,
setup_connect_mock: bool = True,
) -> VonageClient:
params = params or VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
if setup_connect_mock:
self._setup_audio_ready_callback(client, call_ready_for_audio=True)
self._setup_subscriber_callbacks(client)
await client.setup(self._get_frame_processor_setup())
return client
async def _run_in_thread(self, callback: Callable[[], Any]) -> Any:
"""Helper to run a coroutine in a separate thread."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._executor, callback)
async def _wait_client_async_tasks(self, client: VonageClient) -> None:
"""Helper to wait for all async tasks in the client to complete."""
# Wait for any pending tasks in the client's task manager
drain_event = asyncio.Event()
async def set_event_when_drained() -> None:
# Wait for all queues to be joined (all tasks processed)
if client._event_queue:
await client._event_queue.join()
if client._audio_queue:
await client._audio_queue.join()
if client._video_queue:
await client._video_queue.join()
drain_event.set()
# Schedule the drain coroutine in the event loop
asyncio.create_task(set_event_when_drained())
await drain_event.wait()
async def _create_output_transport(
self, params: VonageVideoConnectorTransportParams
) -> VonageVideoConnectorOutputTransport:
client = self.VonageClient(
self.application_id,
self.session_id,
self.token,
params,
)
transport = self.VonageVideoConnectorOutputTransport(client, params)
await transport.setup(self._get_frame_processor_setup())
return transport
async def _create_input_transport(
self, params: VonageVideoConnectorTransportParams
) -> VonageVideoConnectorInputTransport:
client = self.VonageClient(
self.application_id,
self.session_id,
self.token,
params,
)
transport = self.VonageVideoConnectorInputTransport(client, params)
await transport.setup(self._get_frame_processor_setup())
return transport
async def _create_transport(
self, params: VonageVideoConnectorTransportParams
) -> VonageVideoConnectorTransport:
transport = VonageVideoConnectorTransport(
self.application_id,
self.session_id,
self.token,
params,
)
await transport.input().setup(self._get_frame_processor_setup())
await transport.output().setup(self._get_frame_processor_setup())
return transport
@pytest.mark.asyncio
async def test_vonage_client_setup_n_cleanup(self) -> None:
"""Test VonageClient setup and cleanup methods."""
params = self.VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
# Before setup, task manager and queues should be None
assert client._task_manager is None
assert client._event_queue is None
assert client._event_task is None
assert client._audio_queue is None
assert client._audio_task is None
assert client._video_queue is None
assert client._video_task is None
# Setup the client
setup = self._get_frame_processor_setup()
await client.setup(setup)
# Mock connection
self.mock_client_instance.connect.return_value = True
client._connected = True
client._connection_counter = 1
# After setup, task manager and queues should be initialized
assert client._task_manager is not None
assert client._task_manager == setup.task_manager
assert client._event_queue is not None
assert client._event_task is not None
assert client._audio_queue is not None
assert client._audio_task is not None
assert client._video_queue is not None
assert client._video_task is not None
# Test that calling setup again doesn't recreate the task manager
old_task_manager = client._task_manager
old_event_queue = client._event_queue
old_event_task = client._event_task
await client.setup(setup)
assert client._task_manager == old_task_manager
assert client._event_queue == old_event_queue
assert client._event_task == old_event_task
# Test cleanup without being connected
await client.cleanup()
# After cleanup, tasks should be cancelled
assert client._event_task is None
assert client._audio_task is None
assert client._video_task is None
# Verify disconnect was called
self.mock_client_instance.disconnect.assert_called()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"has_audio, has_video",
[
(True, False),
(False, True),
(True, True),
(False, False),
],
)
async def test_vonage_client_connect_first_time(self, has_audio: bool, has_video: bool) -> None:
"""Test VonageClient connect method for first connection."""
params = self.VonageVideoConnectorTransportParams()
# make changes to params depending on the configuration to check the right value
# goes to the right destination
params.audio_in_channels = 1 if has_video else 2
params.audio_out_channels = 2 if has_video else 1
params.audio_in_sample_rate = 44100 if has_video else 22050
params.audio_out_sample_rate = 22050 if has_video else 44100
params.session_enable_migration = has_video
params.video_out_color_format = "YCbCr" if has_audio else "RGB"
params.video_out_framerate = 30 if has_audio else 15
params.video_out_width = 1280 if has_audio else 640
params.video_out_height = 720 if has_audio else 480
params.audio_in_enabled = has_audio
params.audio_out_enabled = has_audio
params.video_in_enabled = has_video
params.video_out_enabled = has_video
params.video_connector_log_level = "WARN" if has_audio else "ERROR"
client = await self._create_client(params)
# Mock the connect method to return True
self.mock_client_instance.connect.return_value = True
listener = self.VonageClientListener()
listener.on_connected = AsyncMock()
# only set this callback if we have audio enabled
self._setup_audio_ready_callback(client, has_audio)
listener_id = client.add_listener(listener)
await client.connect()
assert isinstance(listener_id, int)
self.mock_client_instance.connect.assert_called_once()
# Verify connect was called with correct parameters
call_args = self.mock_client_instance.connect.call_args
assert call_args[1]["application_id"] == self.application_id
assert call_args[1]["session_id"] == self.session_id
assert call_args[1]["token"] == self.token
assert call_args[1]["session_settings"] == MockSessionSettings(
av=MockSessionAVSettings(
audio_publisher=MockSessionAudioSettings(
sample_rate=params.audio_out_sample_rate,
number_of_channels=params.audio_out_channels,
),
audio_subscribers_mix=MockSessionAudioSettings(
sample_rate=params.audio_in_sample_rate,
number_of_channels=params.audio_in_channels,
),
video_publisher=MockSessionVideoPublisherSettings(
resolution=MockVideoResolution(
width=params.video_out_width,
height=params.video_out_height,
),
fps=params.video_out_framerate,
format=client._video_out_color_format_vonage,
),
),
enable_migration=params.session_enable_migration,
logging=MockLoggingSettings(level=params.video_connector_log_level),
)
assert self._connect_callbacks is not None
assert call_args[1]["on_audio_data_cb"] == client._on_session_audio_data_cb
assert call_args[1]["on_error_cb"] == self._connect_callbacks.on_error_cb
assert call_args[1]["on_connected_cb"] == client._on_session_connected_cb
assert call_args[1]["on_disconnected_cb"] == self._connect_callbacks.on_disconnected_cb
assert (
call_args[1]["on_ready_for_audio_cb"] == self._connect_callbacks.on_ready_for_audio_cb
)
assert call_args[1]["on_stream_received_cb"] == client._on_stream_received_cb
assert call_args[1]["on_stream_dropped_cb"] == client._on_stream_dropped_cb
listener.on_connected.assert_called_once()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"has_audio, has_video",
[
(True, False),
(False, True),
(True, True),
(False, False),
],
)
async def test_vonage_client_publish_after_connect(
self, has_audio: bool, has_video: bool
) -> None:
"""Test VonageClient publishes after being connected method for first connection."""
params = self.VonageVideoConnectorTransportParams()
# make changes to params depending on the configuration to check the right value
# goes to the right destination
params.audio_in_enabled = has_audio
params.audio_out_enabled = has_audio
params.video_in_enabled = has_video
params.video_out_enabled = has_video
params.audio_out_channels = 2 if has_video else 1
params.publisher_enable_opus_dtx = not has_video
params.publisher_name = "test-audio" if has_audio else "test-video"
client = await self._create_client(params)
await client.connect()
self.mock_client_instance.connect.assert_called_once()
# trigger the _on_session_connected_cb to simulate being connected
await self._run_in_thread(
lambda: client._on_session_connected_cb(vonage_video_mock.models.Session(id="session"))
)
await self._wait_client_async_tasks(client)
# if no audio and no video, publish should not be called
if not has_audio and not has_video:
self.mock_client_instance.publish.assert_not_called()
return
# Verify publish was called with correct parameters
self.mock_client_instance.publish.assert_called_once()
call_args = self.mock_client_instance.publish.call_args
assert call_args[1]["settings"] == MockPublisherSettings(
name=params.publisher_name,
audio_settings=MockPublisherAudioSettings(
enable_stereo_mode=params.audio_out_channels == 2,
enable_opus_dtx=params.publisher_enable_opus_dtx,
),
has_audio=has_audio,
has_video=has_video,
)
assert call_args[1]["on_error_cb"] == client._on_publisher_error_cb
assert call_args[1]["on_stream_created_cb"] == client._on_publisher_stream_created_cb
assert call_args[1]["on_stream_destroyed_cb"] == client._on_publisher_stream_destroyed_cb
@pytest.mark.asyncio
async def test_vonage_client_connect_already_connected(self) -> None:
"""Test VonageClient connect when already connected."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
# add some listeners, tests multiple listeners are notified, test no notifiaction after removal too
listener1 = self.VonageClientListener()
listener1.on_connected = AsyncMock()
client.add_listener(listener1)
listener2 = self.VonageClientListener()
listener2.on_connected = AsyncMock()
client.add_listener(listener2)
listener3 = self.VonageClientListener()
listener3.on_connected = AsyncMock()
listener_id3 = client.add_listener(listener3)
client.remove_listener(listener_id3)
# First connection, connection is performed
await client.connect()
self.mock_client_instance.connect.assert_called_once()
listener1.on_connected.assert_called_once()
listener2.on_connected.assert_called_once()
# Second connection, should not trigger a new connect call or raised any events
await client.connect()
self.mock_client_instance.connect.assert_called_once()
listener1.on_connected.assert_called_once()
listener2.on_connected.assert_called_once()
# the removed listener should not have received any events
listener3.on_connected.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_client_connect_while_disconnecting(self) -> None:
"""Test VonageClient waits for disconnect to complete before connecting."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
self.mock_client_instance.disconnect = MagicMock()
# Simulate disconnect in progress
disconnect_future = asyncio.get_running_loop().create_future()
client._disconnecting_future = disconnect_future
# Start connect task - it should block waiting for disconnect
connect_task = asyncio.create_task(client.connect())
# Give control to the event loop to let connect task start
await asyncio.sleep(0.2)
self.mock_client_instance.connect.assert_not_called()
# Resolve the disconnect future to unblock connect
disconnect_future.set_result(None)
# Wait for connect to complete
await connect_task
self.mock_client_instance.connect.assert_called_once()
# Verify client state
assert client._connected is True
assert client._connection_counter == 1
@pytest.mark.asyncio
async def test_vonage_client_timeout_while_connecting(self) -> None:
"""Test VonageClient handles timeout during connection."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params, setup_connect_mock=False)
# Create an event that will block but can be interrupted
stop_event = threading.Event()
# Mock the SDK connect method to block until interrupted
def connect_blocks_forever(*args: Any, **kwargs: Any) -> bool:
stop_event.wait(timeout=10) # Wait max 10 seconds but can be interrupted
return True
self.mock_client_instance.connect.side_effect = connect_blocks_forever
try:
# Patch the timeout to be very short for fast test execution
with patch(
"pipecat.transports.vonage.client.VIDEO_CONNECTOR_TIMEOUT",
timedelta(seconds=0.1),
):
# Attempt to connect, should timeout
with pytest.raises(asyncio.TimeoutError):
await client.connect()
# Verify client state after timeout
assert client._connected is False
assert client._connection_counter == 0
assert client._connecting_future is None
finally:
# Stop the blocking thread
stop_event.set()
@pytest.mark.asyncio
async def test_vonage_client_concurrent_connects(self) -> None:
"""Test VonageClient concurrent connects."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
# Mock the connect method to return True and store the callback
connecting_future: asyncio.Future[Callable[[Any], None]] = (
asyncio.get_running_loop().create_future()
)
def connect_side_effect(
*_: Any, on_ready_for_audio_cb: Callable[[Any], None] | None = None, **__: Any
) -> bool:
assert on_ready_for_audio_cb is not None
connecting_future.set_result(on_ready_for_audio_cb)
return True
self.mock_client_instance.connect = MagicMock(side_effect=connect_side_effect)
# create a listener
listener = self.VonageClientListener()
listener.on_connected = AsyncMock()
client.add_listener(listener)
# send two parallel connect calls and let them get stuck awaiting
connect1_task = asyncio.create_task(client.connect())
connect2_task = asyncio.create_task(client.connect())
audio_ready_cb = await connecting_future
# Now both connects are waiting on the same promise, we can set it to complete
audio_ready_cb(vonage_video_mock.models.Session(id="session"))
# await for the connections to now complete
await asyncio.gather(connect1_task, connect2_task)
# SDK connect should only be called once
self.mock_client_instance.connect.assert_called_once()
listener.on_connected.assert_called_once()
@pytest.mark.asyncio
async def test_vonage_client_connect_failure(self) -> None:
"""Test VonageClient connect method when connection fails."""
client = await self._create_client(setup_connect_mock=False)
# Mock the connect method to return False
self.mock_client_instance.connect.return_value = False
with pytest.raises(Exception) as exc_info:
await client.connect()
assert "Could not connect to session" in str(exc_info.value)
@pytest.mark.asyncio
async def test_vonage_client_disconnect_before_connecting(self) -> None:
"""Test VonageClient disconnect method before connecting."""
client = await self._create_client()
listener = self.VonageClientListener()
listener.on_disconnected = AsyncMock()
client.add_listener(listener)
await client.disconnect()
self.mock_client_instance.disconnect.assert_not_called()
listener.on_disconnected.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_client_disconnect(self) -> None:
"""Test VonageClient disconnect method."""
client = await self._create_client()
# create a listener
listener = self.VonageClientListener()
listener.on_disconnected = AsyncMock()
client.add_listener(listener)
# send two parallel connect calls and let them get stuck awaiting
connect_promise1 = client.connect()
connect_promise2 = client.connect()
# await for the connections to now complete
await asyncio.gather(connect_promise1, connect_promise2)
# Add some items to the queues before disconnect
assert client._event_queue is not None
assert client._audio_queue is not None
assert client._video_queue is not None
async def mock_event_task() -> None:
pass
async def mock_audio_task() -> None:
pass
async def mock_video_task() -> None:
pass
await client._event_queue.put(mock_event_task())
await client._audio_queue.put(mock_audio_task())
await client._video_queue.put(mock_video_task())
# Verify queues have items
assert client._event_queue.qsize() == 1
assert client._audio_queue.qsize() == 1
assert client._video_queue.qsize() == 1
# Mock the client's clear_media_buffers method
self.mock_client_instance.clear_media_buffers = MagicMock()
# send the first disconnect call, we should still be connected
await client.disconnect()
self.mock_client_instance.disconnect.assert_not_called()
self.mock_client_instance.clear_media_buffers.assert_not_called()
listener.on_disconnected.assert_not_called()
# Queues should still have items since we didn't actually disconnect
assert client._event_queue.qsize() == 1
assert client._audio_queue.qsize() == 1
assert client._video_queue.qsize() == 1
# check the second disconnect now disconnects for real
await client.disconnect()
self.mock_client_instance.disconnect.assert_called_once()
self.mock_client_instance.clear_media_buffers.assert_called_once()
listener.on_disconnected.assert_called_once()
# Verify queues are now empty after disconnect
assert client._event_queue.qsize() == 0
assert client._audio_queue.qsize() == 0
assert client._video_queue.qsize() == 0
# an extra disconnect should not do anything
await client.disconnect()
self.mock_client_instance.disconnect.assert_called_once()
self.mock_client_instance.clear_media_buffers.assert_called_once()
listener.on_disconnected.assert_called_once()
@pytest.mark.asyncio
async def test_vonage_client_disconnect_while_connecting(self) -> None:
"""Test VonageClient waits for connect to complete before disconnecting."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
client._connected = True
client._connection_counter = 1
self.mock_client_instance.connect = MagicMock()
# Simulate connect in progress
connect_future = asyncio.get_running_loop().create_future()
client._connecting_future = connect_future
# Start disconnect task - it should block waiting for disconnect
disconnect_task = asyncio.create_task(client.disconnect())
# Give control to the event loop to let disconnect task start
await asyncio.sleep(0.2)
self.mock_client_instance.disconnect.assert_not_called()
# Resolve the disconnect future to unblock connect
connect_future.set_result(None)
# Wait for connect to complete
await disconnect_task
self.mock_client_instance.disconnect.assert_called_once()
# Verify client state
assert client._connected is False
assert client._connection_counter == 0
@pytest.mark.asyncio
async def test_vonage_client_timeout_while_disconnecting(self) -> None:
"""Test VonageClient handles timeout during disconnection."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params, setup_connect_mock=False)
await client.connect()
assert client._connection_counter == 1
# Create an event that will block but can be interrupted
stop_event = threading.Event()
# Mock the SDK disconnect method to block until interrupted
def disconnect_blocks_forever(*args: Any, **kwargs: Any) -> bool:
stop_event.wait(timeout=10) # Wait max 10 seconds but can be interrupted
return True
self.mock_client_instance.disconnect.side_effect = disconnect_blocks_forever
try:
# Patch the timeout to be very short for fast test execution
with patch(
"pipecat.transports.vonage.client.VIDEO_CONNECTOR_TIMEOUT",
timedelta(seconds=0.1),
):
# Attempt to connect, should timeout
with pytest.raises(asyncio.TimeoutError):
await client.disconnect()
# Verify client state after timeout
assert client._connected is True
assert client._connection_counter == 1
assert client._disconnecting_future is None
finally:
# Stop the blocking thread
stop_event.set()
@pytest.mark.asyncio
async def test_vonage_client_clear_media_buffers(self) -> None:
"""Test VonageClient clear_media_buffers method."""
params = self.VonageVideoConnectorTransportParams(
audio_out_channels=2, audio_out_sample_rate=48000
)
client = await self._create_client(params)
# Add some items to the audio and video queues
assert client._audio_queue is not None
assert client._video_queue is not None
# Create mock coroutines to add to queues
async def mock_audio_task() -> None:
pass
async def mock_video_task() -> None:
pass
# Put some items in the queues
await client._audio_queue.put(mock_audio_task())
await client._audio_queue.put(mock_audio_task())
await client._video_queue.put(mock_video_task())
# Verify queues have items
assert client._audio_queue.qsize() == 2
assert client._video_queue.qsize() == 1
# Mock the client's clear_media_buffers method
self.mock_client_instance.clear_media_buffers = MagicMock()
# Clear the buffers
client.clear_media_buffers()
# Verify queues are now empty
assert client._audio_queue.qsize() == 0
assert client._video_queue.qsize() == 0
# Verify the SDK client's clear_media_buffers was called
self.mock_client_instance.clear_media_buffers.assert_called_once()
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.VIDEO_QUEUE_MAXSIZE", 1)
async def test_vonage_client_sdk_cb_to_loop_full_queue(self) -> None:
"""Test VonageClient SDK callback to loop filling up the queue."""
params = self.VonageVideoConnectorTransportParams()
client = await self._create_client(params)
# Ensure the loop thread ID is set
assert client._video_queue is not None
assert client._loop_thread_id == threading.get_ident()
# Create a mock coroutine to queue
async def mock_task() -> None:
pass
# Fill queue to max size
for _ in range(client._video_queue.maxsize):
await client._video_queue.put(mock_task())
# Queue should be full
assert client._video_queue.qsize() == client._video_queue.maxsize
# This should log an error and drop the event
async_task = mock_task()
client._sdk_cb_to_loop("test_event", client._video_queue, async_task)
# Queue should still be full (no new item added)
assert client._video_queue.qsize() == client._video_queue.maxsize
# check the coroutine was closed and hence dropped
assert inspect.getcoroutinestate(async_task) == "CORO_CLOSED"
# Clean up the coroutine
task = await client._video_queue.get()
task.close()
client._video_queue.task_done()
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_vonage_client_get_audio_with_resampling(self, mock_resampler: MagicMock) -> None:
"""Test VonageClient get_audio method."""
# Return resampled stereo data
resampled_data = b"\x07\x06\x05\x04\x03\x02\x01\x00"
mock_resampler_instance = Mock()
mock_resampler_instance.resample = AsyncMock(return_value=resampled_data)
mock_resampler.return_value = mock_resampler_instance
params = self.VonageVideoConnectorTransportParams(
audio_in_channels=1,
audio_in_sample_rate=48000,
audio_in_enabled=True,
)
client = await self._create_client(params)
listener = self.VonageClientListener()
on_audio_in_mock = AsyncMock()
listener.on_audio_in = on_audio_in_mock
client.add_listener(listener)
await client.connect()
mock_audio_data = vonage_video_mock.models.AudioData(
sample_buffer=memoryview(b"\x00\x01\x02\x03\x04\x05\x06\x07"),
number_of_frames=4,
number_of_channels=1,
sample_rate=16000,
)
session = vonage_video_mock.models.Session(id="test_session")
client._on_session_audio_data_cb(session, mock_audio_data)
await self._wait_for_condition(lambda: on_audio_in_mock.call_count > 0)
listener.on_audio_in.assert_called_once_with(session, ANY)
frame = listener.on_audio_in.call_args[0][1]
assert frame.audio == resampled_data
assert frame.num_frames == 4
assert frame.sample_rate == 48000
assert frame.num_channels == 1
@pytest.mark.asyncio
async def test_vonage_client_get_video(self) -> None:
"""Test VonageClient get video."""
pass
@pytest.mark.asyncio
async def test_vonage_client_write_audio(self) -> None:
"""Test VonageClient write_audio method."""
params = self.VonageVideoConnectorTransportParams(
audio_out_channels=2, audio_out_sample_rate=48000
)
client = await self._create_client(params)
# Create mock audio data
audio_data = OutputAudioRawFrame(
audio=b"\x00\x01\x02\x03\x04\x05\x06\x07",
sample_rate=48000,
num_channels=2,
) # 4 frames of 2-channel 16-bit audio
await client.write_audio(audio_data)
self.mock_client_instance.add_audio.assert_called_once()
call_args = self.mock_client_instance.add_audio.call_args[0][0]
assert call_args.sample_buffer.tobytes() == audio_data.audio
assert call_args.number_of_frames == 2 # 8 bytes / (2 channels * 2 bytes)
assert call_args.number_of_channels == 2
assert call_args.sample_rate == 48000
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_vonage_client_write_audio_with_resampling(
self, mock_resampler: MagicMock
) -> None:
"""Test VonageClient write_audio method."""
# Return resampled stereo data
resampled_data = b"\x07\x06\x05\x04\x03\x02\x01\x00"
mock_resampler_instance = Mock()
mock_resampler_instance.resample = AsyncMock(return_value=resampled_data)
mock_resampler.return_value = mock_resampler_instance
params = self.VonageVideoConnectorTransportParams(
audio_out_channels=1, audio_out_sample_rate=16000
)
client = await self._create_client(params)
# Create mock audio data
audio_data = OutputAudioRawFrame(
audio=b"\x00\x01\x02\x03\x04\x05\x06\x07",
sample_rate=48000,
num_channels=1,
) # 4 frames of 1-channel 16-bit audio
await client.write_audio(audio_data)
self.mock_client_instance.add_audio.assert_called_once()
call_args = self.mock_client_instance.add_audio.call_args[0][0]
assert call_args.sample_buffer.tobytes() == resampled_data
assert call_args.number_of_frames == 4 # 8 bytes / (1 channel * 2 bytes)
assert call_args.number_of_channels == 1
assert call_args.sample_rate == 16000
@pytest.mark.asyncio
async def test_vonage_client_write_video(self) -> None:
"""Test VonageClient write_video method."""
params = self.VonageVideoConnectorTransportParams(
video_out_width=640,
video_out_height=480,
video_out_color_format="RGB",
)
client = await self._create_client(params)
# Create a test RGB image (640x480, 3 channels)
width, height = 640, 480
# Create RGB data: simple gradient pattern
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
rgb_image[:, :, 0] = 100 # R channel
rgb_image[:, :, 1] = 150 # G channel
rgb_image[:, :, 2] = 200 # B channel
rgb_bytes = rgb_image.tobytes()
# Create OutputImageRawFrame
frame = OutputImageRawFrame(image=rgb_bytes, size=(width, height), format="RGB")
# Mock the add_video method
self.mock_client_instance.add_video = MagicMock(return_value=True)
result = await client.write_video(frame)
# Verify add_video was called
assert result is True
self.mock_client_instance.add_video.assert_called_once()
# Get the VideoFrame argument
call_args = self.mock_client_instance.add_video.call_args[0][0]
# Verify the resolution
assert call_args.resolution.width == width
assert call_args.resolution.height == height
# Verify the format
assert call_args.format == "RGB24"
# Verify BGR conversion happened correctly
# Convert back from the buffer to verify
bgr_buffer = bytes(call_args.frame_buffer)
bgr_image = np.frombuffer(bgr_buffer, dtype=np.uint8).reshape(height, width, 3)
# Check that RGB was converted to BGR (channels swapped)
assert bgr_image[0, 0, 0] == 200 # B channel (was R=200 in RGB)
assert bgr_image[0, 0, 1] == 150 # G channel (unchanged)
assert bgr_image[0, 0, 2] == 100 # R channel (was B=100 in RGB)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"has_audio, has_video",
[
(True, False),
(False, True),
(True, True),
],
)
async def test_vonage_client_subscribe_to_stream(
self, has_audio: bool, has_video: bool
) -> None:
"""Test VonageClient subscribe_to_stream with a stream that exists in session."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
listener = self.VonageClientListener()
client.add_listener(listener)
on_subscriber_connected_mock = AsyncMock()
listener.on_subscriber_connected = on_subscriber_connected_mock
on_subscriber_disconnected_mock = AsyncMock()
listener.on_subscriber_disconnected = on_subscriber_disconnected_mock
await client.connect()
# Add a stream to the session
stream = vonage_video_mock.models.Stream(id="test_stream", connection=DUMMY_CONNECTION)
client._session_streams["test_stream"] = stream
# Setup subscriber callbacks
self._setup_subscriber_callbacks(client)
# Subscribe with audio and video
subscribe_params = SubscribeSettings(
subscribe_to_audio=has_audio,
subscribe_to_video=has_video,
preferred_resolution=(640, 480) if has_video else None,
preferred_framerate=30 if has_video else None,
)
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
await self._subscribe_n_handle_callbacks(
client,
"test_stream",
subscribe_params,
lambda callbacks: callbacks.on_connected_cb(subscriber),
)
on_subscriber_connected_mock.assert_called_once_with(subscriber)
# Verify subscribe was called with correct parameters
self.mock_client_instance.subscribe.assert_called_once()
call_args = self.mock_client_instance.subscribe.call_args
expected_settings = MockSubscriberSettings(
subscribe_to_audio=has_audio,
subscribe_to_video=has_video,
video_settings=MockSubscriberVideoSettings(
preferred_resolution=MockVideoResolution(
width=subscribe_params.preferred_resolution[0],
height=subscribe_params.preferred_resolution[1],
)
if subscribe_params.preferred_resolution
else None,
preferred_framerate=subscribe_params.preferred_framerate,
),
)
assert call_args[0][0] == stream
assert call_args[1]["settings"] == expected_settings
# Verify subscription was stored
assert "test_stream" in client._session_subscriptions
assert client._session_subscriptions["test_stream"] == subscribe_params
# check we can get a disconnect event from the subscriber
self._subscriber_callbacks["test_stream"].on_disconnected_cb(subscriber)
await self._wait_for_condition(lambda: on_subscriber_disconnected_mock.call_count > 0)
on_subscriber_disconnected_mock.assert_called_once_with(subscriber)
@pytest.mark.asyncio
async def test_vonage_client_subscribe_to_stream_same_and_different_params(self) -> None:
"""Test VonageClient subscribe_to_stream when subscribing multiple times with same and different parameters."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
listener = self.VonageClientListener()
client.add_listener(listener)
on_subscriber_connected_mock = AsyncMock()
listener.on_subscriber_connected = on_subscriber_connected_mock
await client.connect()
# Add a stream to the session
stream = vonage_video_mock.models.Stream(id="test_stream", connection=DUMMY_CONNECTION)
client._session_streams["test_stream"] = stream
# Setup subscriber callbacks
self._setup_subscriber_callbacks(client)
# First subscription with specific parameters
subscribe_params_1 = SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=30,
)
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
await self._subscribe_n_handle_callbacks(
client,
"test_stream",
subscribe_params_1,
lambda callbacks: callbacks.on_connected_cb(subscriber),
)
on_subscriber_connected_mock.assert_called_once_with(subscriber)
# Verify subscribe was called once
assert self.mock_client_instance.subscribe.call_count == 1
assert "test_stream" in client._session_subscriptions
assert client._session_subscriptions["test_stream"] == subscribe_params_1
# Subscribe again with SAME parameters - should do nothing
await client.subscribe_to_stream("test_stream", subscribe_params_1)
# Verify subscribe was still only called once (no new subscription)
assert self.mock_client_instance.subscribe.call_count == 1
# Verify unsubscribe was NOT called
self.mock_client_instance.unsubscribe.assert_not_called()
# Subscription should remain unchanged
assert client._session_subscriptions["test_stream"] == subscribe_params_1
# Subscribe again with DIFFERENT parameters - should unsubscribe and resubscribe
subscribe_params_2 = SubscribeSettings(
subscribe_to_audio=False,
subscribe_to_video=True,
preferred_resolution=(1280, 720),
preferred_framerate=60,
)
# Track the current subscribe call count before resubscribing
initial_subscribe_count = self.mock_client_instance.subscribe.call_count
# Start the resubscription
resubscribe_task = asyncio.create_task(
client.subscribe_to_stream("test_stream", subscribe_params_2)
)
# Wait for the new subscribe call to be made
await self._wait_for_condition(
lambda: self.mock_client_instance.subscribe.call_count > initial_subscribe_count,
timeout=timedelta(seconds=2),
)
# Now trigger the connected callback with the new subscription callbacks
self._subscriber_callbacks["test_stream"].on_connected_cb(subscriber)
await resubscribe_task
# Verify unsubscribe was called once (to remove old subscription)
self.mock_client_instance.unsubscribe.assert_called_once_with(stream)
# Verify subscribe was called a second time (for new subscription)
assert self.mock_client_instance.subscribe.call_count == 2
# Verify new subscription parameters are stored
assert client._session_subscriptions["test_stream"] == subscribe_params_2
# Verify the second subscribe call had the new parameters
second_call_args = self.mock_client_instance.subscribe.call_args
expected_settings = MockSubscriberSettings(
subscribe_to_audio=False,
subscribe_to_video=True,
video_settings=MockSubscriberVideoSettings(
preferred_resolution=MockVideoResolution(width=1280, height=720),
preferred_framerate=60,
),
)
assert second_call_args[0][0] == stream
assert second_call_args[1]["settings"] == expected_settings
@pytest.mark.asyncio
async def test_vonage_client_subscribe_to_stream_timeout(self) -> None:
"""Test VonageClient subscribe_to_stream when SDK subscribe times out."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
await client.connect()
stream = vonage_video_mock.models.Stream(id="fail_stream", connection=DUMMY_CONNECTION)
client._session_streams["fail_stream"] = stream
self._setup_subscriber_callbacks(client)
subscribe_params = SubscribeSettings(subscribe_to_audio=True, subscribe_to_video=False)
# Patch the timeout to be very short for fast test execution
# the call never gets on_connected_cb or any other callback, it will timeout
with patch(
"pipecat.transports.vonage.client.VIDEO_CONNECTOR_TIMEOUT",
timedelta(seconds=0.1),
):
with pytest.raises(asyncio.TimeoutError):
await client.subscribe_to_stream("fail_stream", subscribe_params)
@pytest.mark.asyncio
async def test_vonage_client_subscribe_to_stream_fails(self) -> None:
"""Test VonageClient subscribe_to_stream when SDK subscribe fails."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
await client.connect()
stream = vonage_video_mock.models.Stream(id="fail_stream", connection=DUMMY_CONNECTION)
client._session_streams["fail_stream"] = stream
self._setup_subscriber_callbacks(client)
self.mock_client_instance.subscribe.side_effect = lambda *_, **__: False
subscribe_params = SubscribeSettings(subscribe_to_audio=True, subscribe_to_video=False)
with pytest.raises(VonageException) as exc_info:
await client.subscribe_to_stream("fail_stream", subscribe_params)
assert "Could not subscribe to stream" in str(exc_info.value)
@pytest.mark.asyncio
async def test_vonage_client_subscribe_to_stream_subscriber_error(self) -> None:
"""Test VonageClient subscribe_to_stream when subscriber reports an error."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
await client.connect()
stream = vonage_video_mock.models.Stream(id="error_stream", connection=DUMMY_CONNECTION)
client._session_streams["error_stream"] = stream
self._setup_subscriber_callbacks(client)
subscribe_params = SubscribeSettings(subscribe_to_audio=True, subscribe_to_video=False)
# Subscription should raise an exception
with pytest.raises(VonageException) as exc_info:
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
await self._subscribe_n_handle_callbacks(
client,
"error_stream",
subscribe_params,
lambda callbacks: callbacks.on_error_cb(subscriber, "Connection failed", 1500),
)
assert "Subscriber error" in str(exc_info.value)
assert "Connection failed" in str(exc_info.value)
assert "(code 1500)" in str(exc_info.value)
# Verify subscription was removed
assert "error_stream" not in client._session_subscriptions
@pytest.mark.asyncio
async def test_vonage_client_subscribe_to_stream_subscriber_disconnected_before_connected(
self,
) -> None:
"""Test VonageClient subscribe_to_stream when subscriber disconnects before connecting."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = await self._create_client(params)
await client.connect()
stream = vonage_video_mock.models.Stream(id="dc_stream", connection=DUMMY_CONNECTION)
client._session_streams["dc_stream"] = stream
self._setup_subscriber_callbacks(client)
# Add listener to track disconnection
listener = self.VonageClientListener()
on_subscriber_disconnected_mock = AsyncMock()
listener.on_subscriber_disconnected = on_subscriber_disconnected_mock
client.add_listener(listener)
subscribe_params = SubscribeSettings(subscribe_to_audio=True, subscribe_to_video=False)
# Subscription should raise an exception
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
with pytest.raises(VonageException) as exc_info:
await self._subscribe_n_handle_callbacks(
client,
"dc_stream",
subscribe_params,
lambda callbacks: callbacks.on_disconnected_cb(subscriber),
)
assert "disconnected before connecting" in str(exc_info.value)
# Verify subscription was removed
assert "dc_stream" not in client._session_subscriptions
# Verify listener was called
listener.on_subscriber_disconnected.assert_awaited_once_with(subscriber)
@pytest.mark.asyncio
async def test_vonage_client_on_stream_received_triggers_listeners(self) -> None:
"""Test that _on_stream_received_cb triggers on_stream_received listener callbacks."""
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=True, audio_in_auto_subscribe=False
)
client = await self._create_client(params)
# Add multiple listeners
listener1 = self.VonageClientListener()
on_stream_received_mock1 = AsyncMock()
listener1.on_stream_received = on_stream_received_mock1
client.add_listener(listener1)
listener2 = self.VonageClientListener()
on_stream_received_mock2 = AsyncMock()
listener2.on_stream_received = on_stream_received_mock2
client.add_listener(listener2)
await client.connect()
session = vonage_video_mock.models.Session(id="test_session")
stream = vonage_video_mock.models.Stream(id="test_stream", connection=DUMMY_CONNECTION)
# Trigger the callback
client._on_stream_received_cb(session, stream)
# Wait for async processing
await self._wait_for_condition(
lambda: (
on_stream_received_mock1.await_count > 0
and on_stream_received_mock2.await_count > 0
)
)
# Verify both listeners were called
on_stream_received_mock1.assert_awaited_once_with(session, stream)
on_stream_received_mock2.assert_awaited_once_with(session, stream)
# Verify stream was added to session streams
assert "test_stream" in client._session_streams
assert client._session_streams["test_stream"] == stream
@pytest.mark.asyncio
@pytest.mark.parametrize(
"auto_audio, auto_video",
[
(True, False),
(False, True),
(True, True),
(False, False),
],
)
async def test_vonage_client_on_stream_received_auto_subscribe(
self, auto_audio: bool, auto_video: bool
) -> None:
"""Test that _on_stream_received_cb auto-subscribes when auto_subscribe is enabled."""
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=True,
video_in_enabled=True,
audio_in_auto_subscribe=auto_audio,
video_in_auto_subscribe=auto_video,
video_in_preferred_resolution=(640, 480) if auto_video else None,
video_in_preferred_framerate=25 if auto_video else None,
)
client = await self._create_client(params)
await client.connect()
self._setup_subscriber_callbacks(client)
session = vonage_video_mock.models.Session(id="test_session")
stream = vonage_video_mock.models.Stream(id="auto_sub_stream", connection=DUMMY_CONNECTION)
# Trigger the callback
client._on_stream_received_cb(session, stream)
await self._wait_for_condition(lambda: stream.id in client._session_streams)
if not auto_audio and not auto_video:
await self._wait_client_async_tasks(client)
# No auto-subscribe should happen
await self._wait_client_async_tasks(client)
self.mock_client_instance.subscribe.assert_not_called()
return
# Wait for auto-subscribe to happen
await self._wait_for_condition(lambda: "auto_sub_stream" in self._subscriber_callbacks)
# Verify subscribe was called
self.mock_client_instance.subscribe.assert_called_once()
call_args = self.mock_client_instance.subscribe.call_args
# Verify subscription settings
expected_settings = MockSubscriberSettings(
subscribe_to_audio=auto_audio,
subscribe_to_video=auto_video,
video_settings=MockSubscriberVideoSettings(
preferred_resolution=MockVideoResolution(width=640, height=480)
if auto_video
else None,
preferred_framerate=25 if auto_video else None,
),
)
assert call_args[0][0] == stream
assert call_args[1]["settings"] == expected_settings
# Verify subscription was stored
assert "auto_sub_stream" in client._session_subscriptions
assert client._session_subscriptions["auto_sub_stream"].subscribe_to_audio == auto_audio
assert client._session_subscriptions["auto_sub_stream"].subscribe_to_video == auto_video
self._subscriber_callbacks["auto_sub_stream"].on_connected_cb(MockSubscriber(stream=stream))
await self._wait_client_async_tasks(client)
@pytest.mark.asyncio
async def test_vonage_client_on_stream_received_skips_existing_subscription(self) -> None:
"""Test that _on_stream_received_cb does not auto-subscribe if stream is already subscribed."""
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=True, audio_in_auto_subscribe=True
)
client = await self._create_client(params)
listener = self.VonageClientListener()
on_stream_received_mock = AsyncMock()
listener.on_stream_received = on_stream_received_mock
client.add_listener(listener)
await client.connect()
self._setup_subscriber_callbacks(client)
session = vonage_video_mock.models.Session(id="test_session")
stream = vonage_video_mock.models.Stream(id="existing_stream", connection=DUMMY_CONNECTION)
# Manually add an existing subscription
client._session_subscriptions["existing_stream"] = SubscribeSettings(
subscribe_to_audio=True, subscribe_to_video=False
)
# Trigger the callback
client._on_stream_received_cb(session, stream)
# Wait for listener to be called
await self._wait_for_condition(lambda: on_stream_received_mock.await_count > 0)
on_stream_received_mock.assert_awaited_once_with(session, stream)
# Wait to ensure no subscription happens
await self._wait_client_async_tasks(client)
# Verify subscribe was NOT called (because subscription already exists)
self.mock_client_instance.subscribe.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_client_events(self) -> None:
"""Test VonageClient events"""
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=48000,
audio_in_channels=2,
)
client = await self._create_client(params)
# Mock the connect method to return True
self.mock_client_instance.connect.return_value = True
self._setup_audio_ready_callback(client, call_ready_for_audio=True)
self._setup_subscriber_callbacks(client)
# create a listener
listener = self.VonageClientListener()
on_error_mock = AsyncMock()
listener.on_error = on_error_mock
on_audio_in_mock = AsyncMock()
listener.on_audio_in = on_audio_in_mock
on_stream_received_mock = AsyncMock()
listener.on_stream_received = on_stream_received_mock
on_stream_dropped_mock = AsyncMock()
listener.on_stream_dropped = on_stream_dropped_mock
on_subscriber_connected_mock = AsyncMock()
listener.on_subscriber_connected = on_subscriber_connected_mock
on_subscriber_disconnected_mock = AsyncMock()
listener.on_subscriber_disconnected = on_subscriber_disconnected_mock
client.add_listener(listener)
# connect
await client.connect()
assert self._connect_callbacks is not None
# Test _on_session_error_cb triggers on_error
session = vonage_video_mock.models.Session(id="test_session")
error_description = "Test error description"
error_code = 500
self._connect_callbacks.on_error_cb(session, error_description, error_code)
await self._wait_for_condition(lambda: on_error_mock.await_count > 0)
listener.on_error.assert_called_once_with(session, error_description, error_code)
listener.on_error.reset_mock()
# Test _on_session_audio_data_cb triggers on_audio_in
audio_buffer = np.array([100, 200, 300, 400], dtype=np.int16)
mock_audio_data = vonage_video_mock.models.AudioData(
sample_buffer=memoryview(audio_buffer),
number_of_frames=2,
number_of_channels=2,
sample_rate=48000,
)
client._on_session_audio_data_cb(session, mock_audio_data)
await self._wait_for_condition(lambda: on_audio_in_mock.await_count > 0)
listener.on_audio_in.assert_awaited_once_with(session, ANY)
frame = listener.on_audio_in.call_args[0][1]
assert frame.audio == audio_buffer.tobytes()
assert frame.sample_rate == 48000
assert frame.num_channels == 2
listener.on_audio_in.reset_mock()
# Test _on_stream_received_cb triggers on_stream_received
stream = vonage_video_mock.models.Stream(id="test_stream", connection=DUMMY_CONNECTION)
client._on_stream_received_cb(session, stream)
await self._wait_for_condition(lambda: on_stream_received_mock.await_count > 0)
listener.on_stream_received.assert_awaited_once_with(session, stream)
await self._wait_for_condition(lambda: stream.id in self._subscriber_callbacks)
assert stream.id in self._subscriber_callbacks
callbacks = self._subscriber_callbacks[stream.id]
self.mock_client_instance.subscribe.assert_called_once_with(
stream,
settings=ANY,
on_error_cb=callbacks.on_error_cb,
on_connected_cb=callbacks.on_connected_cb,
on_disconnected_cb=callbacks.on_disconnected_cb,
on_render_frame_cb=client._on_subscriber_video_data_cb,
)
listener.on_stream_received.reset_mock()
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
callbacks.on_connected_cb(subscriber)
await self._wait_for_condition(lambda: on_subscriber_connected_mock.await_count > 0)
listener.on_subscriber_connected.assert_awaited_once()
listener.on_subscriber_connected.reset_mock()
# Test _on_subscriber_disconnected_cb triggers on_subscriber_disconnected
callbacks.on_disconnected_cb(subscriber)
await self._wait_for_condition(lambda: on_subscriber_disconnected_mock.await_count > 0)
listener.on_subscriber_disconnected.assert_awaited_once_with(subscriber)
listener.on_subscriber_disconnected.reset_mock()
# Test _on_stream_dropped_cb triggers on_stream_dropped
self.mock_client_instance.unsubscribe = MagicMock()
client._on_stream_dropped_cb(session, stream)
await self._wait_for_condition(lambda: on_stream_dropped_mock.await_count > 0)
listener.on_stream_dropped.assert_awaited_once_with(session, stream)
self.mock_client_instance.unsubscribe.assert_not_called()
listener.on_stream_dropped.reset_mock()
# Test _on_subscriber_connected_cb triggers on_subscriber_connected
subscriber_stream = vonage_video_mock.models.Stream(
id="subscriber_stream", connection=DUMMY_CONNECTION
)
subscriber = vonage_video_mock.models.Subscriber(stream=subscriber_stream)
# Test error callbacks are logged but don't trigger listener events
# (these are internal error callbacks, not session errors)
publisher_stream = vonage_video_mock.models.Stream(
id="publisher_stream", connection=DUMMY_CONNECTION
)
publisher = vonage_video_mock.models.Publisher(stream=publisher_stream)
# These should not raise exceptions
client._on_publisher_error_cb(publisher, "publisher error", 400)
callbacks.on_error_cb(subscriber, "subscriber error", 401)
await self._wait_client_async_tasks(client)
@pytest.mark.asyncio
async def test_vonage_client_on_subscriber_video_data_cb_rgb_format(self) -> None:
"""Test _on_subscriber_video_data_cb with RGB format video frames."""
from pipecat.frames.frames import UserImageRawFrame
params = self.VonageVideoConnectorTransportParams(
video_in_enabled=True,
video_in_auto_subscribe=False,
)
client = await self._create_client(params)
# Add listener to capture video frames
listener = self.VonageClientListener()
on_video_in_mock = AsyncMock()
listener.on_video_in = on_video_in_mock
client.add_listener(listener)
await client.connect()
# Create a test RGB video frame (4x4, 3 channels)
width, height = 4, 4
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
rgb_image[:, :, 0] = 100 # R channel
rgb_image[:, :, 1] = 150 # G channel
rgb_image[:, :, 2] = 200 # B channel
rgb_bytes = rgb_image.tobytes()
# Create mock video frame with RGB24 format (which Vonage uses for RGB)
mock_video_frame = vonage_video_mock.models.VideoFrame(
frame_buffer=memoryview(rgb_bytes),
resolution=MockVideoResolution(width=width, height=height),
format="RGB24",
)
# Create mock subscriber
stream = vonage_video_mock.models.Stream(id="video_stream", connection=DUMMY_CONNECTION)
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
# Trigger the callback
client._on_subscriber_video_data_cb(subscriber, mock_video_frame)
# Wait for async processing
await self._wait_for_condition(lambda: on_video_in_mock.await_count > 0)
# Verify listener was called
on_video_in_mock.assert_awaited_once()
call_args = on_video_in_mock.call_args[0]
assert call_args[0] == subscriber
# Get the processed frame
processed_frame: UserImageRawFrame = call_args[1]
assert processed_frame.user_id == "video_stream"
assert processed_frame.size == (width, height)
assert processed_frame.format == "RGB"
# Verify BGR to RGB conversion happened
processed_image = np.frombuffer(processed_frame.image, dtype=np.uint8).reshape(
height, width, 3
)
assert processed_image[0, 0, 0] == 200 # R channel (was B in BGR)
assert processed_image[0, 0, 1] == 150 # G channel (unchanged)
assert processed_image[0, 0, 2] == 100 # B channel (was R in BGR)
@pytest.mark.asyncio
async def test_vonage_input_transport_initialization(self) -> None:
"""Test VonageVideoConnectorInputTransport initialization."""
params = self.VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport_params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
transport = self.VonageVideoConnectorInputTransport(client, transport_params)
assert transport._client == client
assert transport._initialized is False
@pytest.mark.asyncio
async def test_vonage_input_transport_start(self) -> None:
"""Test VonageVideoConnectorInputTransport start method."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport = self.VonageVideoConnectorInputTransport(client, params)
# Mock the client connect method
with (
patch.object(client, "connect", AsyncMock(return_value=1)) as client_connect_mock,
patch.object(transport, "set_transport_ready", AsyncMock()) as set_transport_ready_mock,
):
start_frame = StartFrame()
await transport.start(start_frame)
assert transport._initialized is True
assert transport._connected is True
client_connect_mock.assert_called_once()
set_transport_ready_mock.assert_called_once_with(start_frame)
@pytest.mark.asyncio
async def test_vonage_input_transport_stop(self) -> None:
"""Test VonageVideoConnectorInputTransport stop method."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport = self.VonageVideoConnectorInputTransport(client, params)
transport._listener_id = 1
transport._connected = True
with (
patch.object(client, "disconnect", AsyncMock()) as client_disconnect_mock,
patch.object(client, "remove_listener", MagicMock()) as remove_listener_mock,
):
end_frame = EndFrame()
await transport.stop(end_frame)
client_disconnect_mock.assert_called_once()
remove_listener_mock.assert_called_once_with(1)
assert not transport._connected
@pytest.mark.asyncio
async def test_vonage_input_transport_cancel(self) -> None:
"""Test VonageVideoConnectorInputTransport cancel method."""
params = self.VonageVideoConnectorTransportParams(audio_in_enabled=True)
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport = self.VonageVideoConnectorInputTransport(client, params)
transport._listener_id = 1
transport._connected = True
# Mock the client disconnect method
with (
patch.object(client, "disconnect", AsyncMock()) as client_disconnect_mock,
patch.object(client, "remove_listener", MagicMock()) as remove_listener_mock,
):
cancel_frame = CancelFrame()
await transport.cancel(cancel_frame)
client_disconnect_mock.assert_called_once()
remove_listener_mock.assert_called_once_with(1)
assert not transport._connected
@pytest.mark.asyncio
async def test_vonage_output_transport_initialization(self) -> None:
"""Test VonageVideoConnectorOutputTransport initialization."""
params = self.VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport_params = self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
transport = self.VonageVideoConnectorOutputTransport(client, transport_params)
assert transport._client == client
assert transport._initialized is False
@pytest.mark.asyncio
async def test_vonage_output_transport_start(self) -> None:
"""Test VonageVideoConnectorOutputTransport start method."""
params = self.VonageVideoConnectorTransportParams()
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
transport_params = self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
transport = self.VonageVideoConnectorOutputTransport(client, transport_params)
with (
patch.object(client, "connect", AsyncMock(return_value=1)) as client_connect_mock,
patch.object(transport, "set_transport_ready", AsyncMock()) as set_transport_ready_mock,
):
start_frame = StartFrame()
await transport.start(start_frame)
assert transport._initialized is True
client_connect_mock.assert_called_once()
set_transport_ready_mock.assert_called_once_with(start_frame)
@pytest.mark.asyncio
async def test_vonage_output_transport_write_audio_frame(self) -> None:
"""Test VonageVideoConnectorOutputTransport write_audio_frame method."""
params = self.VonageVideoConnectorTransportParams(
audio_out_sample_rate=48000, audio_out_channels=2, audio_out_enabled=True
)
client = self.VonageClient(self.application_id, self.session_id, self.token, params)
with patch.object(client, "write_audio", AsyncMock()) as client_write_audio_mock:
transport_params = self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
transport = self.VonageVideoConnectorOutputTransport(client, transport_params)
transport._connected = True
# Create a mock audio frame
audio_frame = OutputAudioRawFrame(
audio=b"\x00\x01\x02\x03", sample_rate=16000, num_channels=1
)
await transport.write_audio_frame(audio_frame)
# Verify audio was written to client
client_write_audio_mock.assert_called_once_with(audio_frame)
@pytest.mark.asyncio
async def test_vonage_output_transport_write_video_frame_not_connected(self) -> None:
"""Test VonageVideoConnectorOutputTransport write_video_frame method."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(video_out_enabled=True)
)
client = transport._client
# Create a test video frame
width, height = 640, 480
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
rgb_image[:, :, 0] = 100
rgb_image[:, :, 1] = 150
rgb_image[:, :, 2] = 200
video_frame = OutputImageRawFrame(
image=rgb_image.tobytes(), size=(width, height), format="RGB"
)
with patch.object(client, "write_video", AsyncMock(return_value=True)) as write_video_mock:
await transport.stop(EndFrame())
result = await transport.write_video_frame(video_frame)
# Should return False when not connected
assert result is False
write_video_mock.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_output_transport_write_video_frame_connected(self) -> None:
"""Test VonageVideoConnectorOutputTransport write_video_frame method when connected."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(
video_out_enabled=True,
video_out_width=640,
video_out_height=480,
video_out_color_format="RGB",
)
)
client = transport._client
# Create a test video frame
width, height = 640, 480
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
rgb_image[:, :, 0] = 100
rgb_image[:, :, 1] = 150
rgb_image[:, :, 2] = 200
video_frame = OutputImageRawFrame(
image=rgb_image.tobytes(), size=(width, height), format="RGB"
)
with patch.object(client, "write_video", AsyncMock(return_value=True)) as write_video_mock:
transport._connected = True
result = await transport.write_video_frame(video_frame)
# Should return True and call write_video when connected
assert result is True
write_video_mock.assert_called_once_with(video_frame)
@pytest.mark.asyncio
async def test_vonage_output_transport_write_video_frame_invalid_size(self) -> None:
"""Test VonageVideoConnectorOutputTransport write_video_frame with invalid frame size."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(
video_out_enabled=True,
video_out_width=640,
video_out_height=480,
video_out_color_format="RGB",
)
)
# Create a video frame with incorrect size
width, height = 320, 240 # Different from expected 640x480
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
video_frame = OutputImageRawFrame(
image=rgb_image.tobytes(), size=(width, height), format="RGB"
)
transport._connected = True
result = await transport.write_video_frame(video_frame)
# Should return False for invalid size
assert result is False
@pytest.mark.asyncio
async def test_vonage_output_transport_write_video_frame_invalid_format(self) -> None:
"""Test VonageVideoConnectorOutputTransport write_video_frame with invalid color format."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(
video_out_enabled=True,
video_out_width=640,
video_out_height=480,
video_out_color_format="YCbCr",
)
)
# Create a video frame with incorrect size
width, height = 320, 240 # Different from expected 640x480
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
video_frame = OutputImageRawFrame(
image=rgb_image.tobytes(), size=(width, height), format="RGB"
)
transport._connected = True
result = await transport.write_video_frame(video_frame)
# Should return False for invalid size
assert result is False
@pytest.mark.asyncio
async def test_vonage_output_transport_process_frame_with_interruption(self) -> None:
"""Test VonageVideoConnectorOutputTransport process_frame method with InterruptionFrame."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
)
client = transport._client
with (
patch.object(client, "clear_media_buffers") as clear_buffers_mock,
patch.object(client, "connect", AsyncMock()),
):
await transport.start(StartFrame())
interruption_frame = InterruptionFrame()
await transport.process_frame(interruption_frame, FrameDirection.DOWNSTREAM)
# Verify clear_media_buffers was called
clear_buffers_mock.assert_called_once()
@pytest.mark.asyncio
async def test_vonage_output_transport_process_frame_without_interruption(self) -> None:
"""Test VonageVideoConnectorOutputTransport process_frame method with non-interruption frame."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
)
client = transport._client
with patch.object(client, "clear_media_buffers") as clear_buffers_mock:
audio_frame = OutputAudioRawFrame(
audio=b"\x00\x01\x02\x03", sample_rate=16000, num_channels=1
)
await transport.process_frame(audio_frame, FrameDirection.DOWNSTREAM)
# Verify clear_media_buffers was NOT called for non-interruption frames
clear_buffers_mock.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_output_transport_process_frame_when_not_connected(self) -> None:
"""Test VonageVideoConnectorOutputTransport process_frame method when not connected."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(audio_out_enabled=True)
)
await transport.stop(EndFrame()) # Ensure transport is not connected
client = transport._client
with patch.object(client, "clear_media_buffers") as clear_buffers_mock:
interruption_frame = InterruptionFrame()
await transport.process_frame(interruption_frame, FrameDirection.DOWNSTREAM)
# Verify clear_media_buffers was NOT called when not connected
clear_buffers_mock.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_output_transport_interruption_with_clear_buffers_disabled(self) -> None:
"""Test VonageVideoConnectorOutputTransport with clear_buffers_on_interruption=False."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(
audio_out_enabled=True, clear_buffers_on_interruption=False
)
)
client = transport._client
with (
patch.object(client, "clear_media_buffers") as clear_buffers_mock,
patch.object(client, "connect", AsyncMock()),
):
await transport.start(StartFrame())
interruption_frame = InterruptionFrame()
await transport.process_frame(interruption_frame, FrameDirection.DOWNSTREAM)
# Verify clear_media_buffers was NOT called when clear_buffers_on_interruption is False
clear_buffers_mock.assert_not_called()
@pytest.mark.asyncio
async def test_vonage_output_transport_interruption_with_clear_buffers_enabled(self) -> None:
"""Test VonageVideoConnectorOutputTransport with clear_buffers_on_interruption=True (default)."""
transport = await self._create_output_transport(
params=self.VonageVideoConnectorTransportParams(
audio_out_enabled=True, clear_buffers_on_interruption=True
)
)
client = transport._client
with (
patch.object(client, "clear_media_buffers") as clear_buffers_mock,
patch.object(client, "connect", AsyncMock()),
):
await transport.start(StartFrame())
interruption_frame = InterruptionFrame()
await transport.process_frame(interruption_frame, FrameDirection.DOWNSTREAM)
# Verify clear_media_buffers was called when clear_buffers_on_interruption is True
clear_buffers_mock.assert_called_once()
@pytest.mark.asyncio
@pytest.mark.parametrize("transport_type", ["input", "output"])
async def test_vonage_transport_sets_audio_sample_rates_from_start_frame(
self, transport_type: str
) -> None:
"""Test transport sets audio sample rates from StartFrame when params are None."""
# Create params with None sample rates
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=(transport_type == "input"),
audio_out_enabled=(transport_type == "output"),
audio_in_sample_rate=None,
audio_out_sample_rate=None,
)
transport: VonageVideoConnectorInputTransport | VonageVideoConnectorOutputTransport
if transport_type == "input":
transport = await self._create_input_transport(params=params)
else:
transport = await self._create_output_transport(params=params)
client = transport._client
# Create a StartFrame with specific sample rates
start_frame = StartFrame(audio_in_sample_rate=22050, audio_out_sample_rate=44100)
with patch.object(client, "_sdk_connect", AsyncMock()):
await transport.start(start_frame)
# Verify both sample rates were set from the StartFrame
assert client._audio_in_sample_rate == 22050
assert client._audio_out_sample_rate == 44100
@pytest.mark.asyncio
@pytest.mark.parametrize("transport_type", ["input", "output"])
async def test_vonage_transport_doesnt_override_audio_sample_rates(
self, transport_type: str
) -> None:
"""Test transport doesn't override audio sample rates when already set in params."""
# Create params with specific sample rates
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=(transport_type == "input"),
audio_out_enabled=(transport_type == "output"),
audio_in_sample_rate=48000,
audio_out_sample_rate=16000,
)
transport: VonageVideoConnectorInputTransport | VonageVideoConnectorOutputTransport
if transport_type == "input":
transport = await self._create_input_transport(params=params)
else:
transport = await self._create_output_transport(params=params)
client = transport._client
# Create a StartFrame with different sample rates
start_frame = StartFrame(audio_in_sample_rate=22050, audio_out_sample_rate=44100)
with patch.object(client, "_sdk_connect", AsyncMock()):
await transport.start(start_frame)
# Verify sample rates remain as originally set in params
assert client._audio_in_sample_rate == 48000
assert client._audio_out_sample_rate == 16000
@pytest.mark.asyncio
async def test_vonage_transport_initialization(self) -> None:
"""Test VonageVideoConnectorTransport initialization."""
params = self.VonageVideoConnectorTransportParams(
audio_out_sample_rate=48000,
audio_out_channels=2,
audio_out_enabled=True,
session_enable_migration=True,
publisher_name="test-publisher",
publisher_enable_opus_dtx=True,
)
transport = await self._create_transport(params=params)
assert transport._client is not None
assert transport._one_stream_received is False
# Verify vonage client was initialized with correct parameters
client_params = transport._client._params
assert client_params.audio_out_sample_rate == 48000
assert client_params.audio_out_channels == 2
assert client_params.session_enable_migration is True
@pytest.mark.asyncio
async def test_vonage_transport_input_output_methods(self) -> None:
"""Test VonageVideoConnectorTransport input and output methods."""
params = self.VonageVideoConnectorTransportParams()
transport = self.VonageVideoConnectorTransport(
self.application_id, self.session_id, self.token, params
)
# Test input method
input_transport = transport.input()
assert isinstance(input_transport, self.VonageVideoConnectorInputTransport)
# Test output method
output_transport = transport.output()
assert isinstance(output_transport, self.VonageVideoConnectorOutputTransport)
# Verify they return the same instances on subsequent calls
assert transport.input() is input_transport
assert transport.output() is output_transport
@pytest.mark.asyncio
async def test_vonage_input_audio_callback(self) -> None:
"""Test audio input callback processing."""
params = self.VonageVideoConnectorTransportParams(
audio_in_enabled=True,
)
transport = await self._create_input_transport(params)
client = transport._client
with (
patch.object(transport, "push_audio_frame", AsyncMock()) as mock_push_audio_frame,
patch.object(client, "connect", AsyncMock(return_value=1)),
):
start_frame = StartFrame()
await transport.start(start_frame)
# Create mock audio data
audio_buffer = np.array([100, 200, 300, 400], dtype=np.int16)
audio_frame = InputAudioRawFrame(
audio=audio_buffer.tobytes(), sample_rate=48000, num_channels=2
)
# Call the audio callback
await transport._audio_in_cb(
vonage_video_mock.models.Session(id="session"), audio_frame
)
mock_push_audio_frame.assert_called_once_with(audio_frame)
@pytest.mark.asyncio
async def test_vonage_input_video_callback(self) -> None:
"""Test video input callback processing."""
params = self.VonageVideoConnectorTransportParams(
video_in_enabled=True,
)
transport = await self._create_input_transport(params)
client = transport._client
with (
patch.object(transport, "push_video_frame", AsyncMock()) as mock_push_video_frame,
patch.object(client, "connect", AsyncMock(return_value=1)),
):
start_frame = StartFrame()
await transport.start(start_frame)
# Create mock video frame
width, height = 640, 480
rgb_image = np.zeros((height, width, 3), dtype=np.uint8)
rgb_image[:, :, 0] = 100 # R channel
rgb_image[:, :, 1] = 150 # G channel
rgb_image[:, :, 2] = 200 # B channel
video_frame = UserImageRawFrame(
user_id="test-user", image=rgb_image.tobytes(), size=(width, height), format="RGB"
)
# Create mock subscriber
stream = vonage_video_mock.models.Stream(id="video_stream", connection=DUMMY_CONNECTION)
subscriber = vonage_video_mock.models.Subscriber(stream=stream)
# Call the video callback
await transport._video_in_cb(subscriber, video_frame)
mock_push_video_frame.assert_called_once_with(video_frame)
@pytest.mark.asyncio
async def test_vonage_transport_event_handlers(self) -> None:
"""Test VonageVideoConnectorTransport event handlers."""
params = self.VonageVideoConnectorTransportParams()
transport = await self._create_transport(params)
with patch.object(
transport, "_call_event_handler", new_callable=AsyncMock
) as mock_call_event_handler:
# Test session events
mock_session = Mock()
mock_session.id = "session-123"
await transport._on_connected(mock_session)
mock_call_event_handler.assert_called_with("on_joined", {"sessionId": "session-123"})
await transport._on_disconnected(mock_session)
mock_call_event_handler.assert_called_with("on_left", {"sessionId": "session-123"})
await transport._on_error(mock_session, "test error", 500)
mock_call_event_handler.assert_called_with("on_error", "test error")
# Test stream events
mock_connection = Mock()
mock_connection.data = "connection-data-123"
mock_stream = Mock()
mock_stream.id = "stream-456"
mock_stream.connection = mock_connection
await transport._on_stream_received(mock_session, mock_stream)
# Should call both first participant and participant joined events
expected_calls = [
call(
"on_first_participant_joined",
{
"sessionId": "session-123",
"streamId": "stream-456",
"connectionData": "connection-data-123",
},
),
call(
"on_participant_joined",
{
"sessionId": "session-123",
"streamId": "stream-456",
"connectionData": "connection-data-123",
},
),
]
mock_call_event_handler.assert_has_calls(expected_calls)
await transport._on_stream_dropped(mock_session, mock_stream)
mock_call_event_handler.assert_called_with(
"on_participant_left",
{
"sessionId": "session-123",
"streamId": "stream-456",
"connectionData": "connection-data-123",
},
)
# Test subscriber events
mock_subscriber = Mock()
mock_subscriber.stream = Mock()
mock_subscriber.stream.id = "subscriber-789"
mock_subscriber.stream.connection = Mock()
mock_subscriber.stream.connection.data = "subscriber-conn-data"
await transport._on_subscriber_connected(mock_subscriber)
mock_call_event_handler.assert_called_with(
"on_client_connected",
{
"subscriberId": "subscriber-789",
"streamId": "subscriber-789",
"connectionData": "subscriber-conn-data",
},
)
await transport._on_subscriber_disconnected(mock_subscriber)
mock_call_event_handler.assert_called_with(
"on_client_disconnected",
{
"subscriberId": "subscriber-789",
"streamId": "subscriber-789",
"connectionData": "subscriber-conn-data",
},
)
@pytest.mark.asyncio
async def test_vonage_transport_first_participant_flag(self) -> None:
"""Test that first participant event is only called once."""
params = self.VonageVideoConnectorTransportParams()
transport = await self._create_transport(params)
with patch.object(
transport, "_call_event_handler", new_callable=AsyncMock
) as mock_call_event_handler:
mock_session = Mock()
mock_session.id = "session-123"
mock_connection1 = Mock()
mock_connection1.data = "conn-data-1"
mock_stream1 = Mock()
mock_stream1.id = "stream-456"
mock_stream1.connection = mock_connection1
mock_connection2 = Mock()
mock_connection2.data = "conn-data-2"
mock_stream2 = Mock()
mock_stream2.id = "stream-789"
mock_stream2.connection = mock_connection2
# First stream should trigger first participant event
await transport._on_stream_received(mock_session, mock_stream1)
assert transport._one_stream_received is True
# Reset mock to check second stream
mock_call_event_handler.reset_mock()
# Second stream should not trigger first participant event
await transport._on_stream_received(mock_session, mock_stream2)
mock_call_event_handler.assert_called_once_with(
"on_participant_joined",
{
"sessionId": "session-123",
"streamId": "stream-789",
"connectionData": "conn-data-2",
},
)
class TestAudioNormalization:
"""Test cases for audio normalization functions."""
def setup_method(self) -> None:
"""Set up test fixtures."""
self.AudioProps = AudioProps
self.process_audio_channels = process_audio_channels
self.process_audio = process_audio
self.check_audio_data = check_audio_data
def test_audio_props_creation(self) -> None:
"""Test AudioProps dataclass creation."""
props = self.AudioProps(sample_rate=48000, is_stereo=True)
assert props.sample_rate == 48000
assert props.is_stereo is True
props_mono = self.AudioProps(sample_rate=16000, is_stereo=False)
assert props_mono.sample_rate == 16000
assert props_mono.is_stereo is False
def test_process_audio_channels_mono_to_stereo(self) -> None:
"""Test converting mono audio to stereo."""
# Create mono audio (4 samples)
mono_audio = np.array([100, 200, 300, 400], dtype=np.int16)
current = self.AudioProps(sample_rate=48000, is_stereo=False)
target = self.AudioProps(sample_rate=48000, is_stereo=True)
result = self.process_audio_channels(mono_audio, current, target)
# Should duplicate each sample
expected = np.array([100, 100, 200, 200, 300, 300, 400, 400], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
def test_process_audio_channels_stereo_to_mono(self) -> None:
"""Test converting stereo audio to mono."""
# Create stereo audio (2 frames, 4 samples total)
stereo_audio = np.array([100, 200, 300, 400], dtype=np.int16)
current = self.AudioProps(sample_rate=48000, is_stereo=True)
target = self.AudioProps(sample_rate=48000, is_stereo=False)
result = self.process_audio_channels(stereo_audio, current, target)
# Should average each stereo pair: (100+200)/2=150, (300+400)/2=350
expected = np.array([150, 350], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
def test_process_audio_channels_same_format(self) -> None:
"""Test when source and target have the same channel format."""
audio = np.array([100, 200, 300, 400], dtype=np.int16)
# Test mono to mono
current = self.AudioProps(sample_rate=48000, is_stereo=False)
target = self.AudioProps(sample_rate=48000, is_stereo=False)
result = self.process_audio_channels(audio, current, target)
np.testing.assert_array_equal(result, audio)
# Test stereo to stereo
current = self.AudioProps(sample_rate=48000, is_stereo=True)
target = self.AudioProps(sample_rate=48000, is_stereo=True)
result = self.process_audio_channels(audio, current, target)
np.testing.assert_array_equal(result, audio)
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_process_audio_same_sample_rate(self, mock_resampler: MagicMock) -> None:
"""Test process_audio when sample rates are the same."""
mock_resampler_instance = Mock()
mock_resampler.return_value = mock_resampler_instance
audio = np.array([100, 200, 300, 400], dtype=np.int16)
current = self.AudioProps(sample_rate=48000, is_stereo=False)
target = self.AudioProps(sample_rate=48000, is_stereo=True)
result = await self.process_audio(mock_resampler_instance, audio, current, target)
# Should only do channel conversion, no resampling
expected = np.array([100, 100, 200, 200, 300, 300, 400, 400], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
# Resampler should not be called
mock_resampler_instance.resample.assert_not_called()
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_process_audio_different_sample_rate_mono(
self, mock_resampler: MagicMock
) -> None:
"""Test process_audio with different sample rates (mono)."""
mock_resampler_instance = Mock()
mock_resampler_instance.resample = AsyncMock(
return_value=b"\x64\x00\xc8\x00"
) # 100, 200 in bytes
mock_resampler.return_value = mock_resampler_instance
audio = np.array([150, 250, 350, 450], dtype=np.int16)
current = self.AudioProps(sample_rate=48000, is_stereo=False)
target = self.AudioProps(sample_rate=16000, is_stereo=False)
result = await self.process_audio(mock_resampler_instance, audio, current, target)
# Should resample the audio
expected = np.array([100, 200], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
# Resampler should be called with correct parameters
mock_resampler_instance.resample.assert_called_once_with(audio.tobytes(), 48000, 16000)
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_process_audio_different_sample_rate_stereo_to_mono(
self, mock_resampler: MagicMock
) -> None:
"""Test process_audio with different sample rates and channel conversion."""
mock_resampler_instance = Mock()
# Return resampled mono data
mock_resampler_instance.resample = AsyncMock(
return_value=b"\x64\x00\xc8\x00"
) # 100, 200 in bytes
mock_resampler.return_value = mock_resampler_instance
# Stereo audio: 2 frames with left/right channels
audio = np.array([100, 200, 300, 400], dtype=np.int16) # L1=100, R1=200, L2=300, R2=400
current = self.AudioProps(sample_rate=48000, is_stereo=True)
target = self.AudioProps(sample_rate=16000, is_stereo=False)
result = await self.process_audio(mock_resampler_instance, audio, current, target)
# Should convert to mono first, then resample
expected = np.array([100, 200], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
# Resampler should be called with mono audio
expected_mono = np.array([150, 350], dtype=np.int16) # (100+200)/2, (300+400)/2
mock_resampler_instance.resample.assert_called_once_with(
expected_mono.tobytes(), 48000, 16000
)
@pytest.mark.asyncio
@patch("pipecat.transports.vonage.client.create_stream_resampler")
async def test_process_audio_different_sample_rate_mono_to_stereo(
self, mock_resampler: MagicMock
) -> None:
"""Test process_audio with different sample rates converting mono to stereo."""
mock_resampler_instance = Mock()
# Return resampled mono data
mock_resampler_instance.resample = AsyncMock(
return_value=b"\x64\x00\xc8\x00"
) # 100, 200 in bytes
mock_resampler.return_value = mock_resampler_instance
audio = np.array([150, 250], dtype=np.int16)
current = self.AudioProps(sample_rate=48000, is_stereo=False)
target = self.AudioProps(sample_rate=16000, is_stereo=True)
result = await self.process_audio(mock_resampler_instance, audio, current, target)
# Should resample first (mono), then convert to stereo
expected = np.array([100, 100, 200, 200], dtype=np.int16)
np.testing.assert_array_equal(result, expected)
# Resampler should be called with mono audio
mock_resampler_instance.resample.assert_called_once_with(audio.tobytes(), 48000, 16000)
def test_check_audio_data_valid_mono_bytes(self) -> None:
"""Test check_audio_data with valid mono audio as bytes."""
# 4 frames of mono 16-bit audio (8 bytes total)
buffer = b"\x00\x01\x02\x03\x04\x05\x06\x07"
# Should not raise any exception
self.check_audio_data(buffer, 4, 1)
def test_check_audio_data_valid_stereo_bytes(self) -> None:
"""Test check_audio_data with valid stereo audio as bytes."""
# 2 frames of stereo 16-bit audio (8 bytes total)
buffer = b"\x00\x01\x02\x03\x04\x05\x06\x07"
# Should not raise any exception
self.check_audio_data(buffer, 2, 2)
def test_check_audio_data_valid_memoryview(self) -> None:
"""Test check_audio_data with valid audio as memoryview."""
# Create int16 memoryview (2 bytes per sample)
array = np.array([100, 200, 300, 400], dtype=np.int16)
buffer = memoryview(array)
# Should not raise any exception
self.check_audio_data(buffer, 4, 1) # 4 mono frames
self.check_audio_data(buffer, 2, 2) # 2 stereo frames
def test_check_audio_data_invalid_channels(self) -> None:
"""Test check_audio_data with invalid number of channels."""
buffer = b"\x00\x01\x02\x03"
# Should raise ValueError for invalid channel counts
with pytest.raises(ValueError) as exc_info:
self.check_audio_data(buffer, 2, 3) # 3 channels not supported
assert "mono or stereo" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
self.check_audio_data(buffer, 2, 0) # 0 channels not supported
assert "mono or stereo" in str(exc_info.value)
def test_check_audio_data_invalid_bit_depth_bytes(self) -> None:
"""Test check_audio_data with invalid bit depth using bytes."""
# 2 frames of mono audio with 1 byte per sample (8-bit)
buffer = b"\x00\x01"
with pytest.raises(ValueError) as exc_info:
self.check_audio_data(buffer, 2, 1)
assert "16 bit PCM" in str(exc_info.value)
assert "got 8 bit" in str(exc_info.value)
def test_check_audio_data_invalid_bit_depth_memoryview(self) -> None:
"""Test check_audio_data with invalid bit depth using memoryview."""
# Create uint8 memoryview (1 byte per sample)
array = np.array([100, 200], dtype=np.uint8)
buffer = memoryview(array)
with pytest.raises(ValueError) as exc_info:
self.check_audio_data(buffer, 2, 1)
assert "16 bit PCM" in str(exc_info.value)
assert "got 8 bit" in str(exc_info.value)
def test_check_audio_data_buffer_size_mismatch(self) -> None:
"""Test check_audio_data with buffer size that doesn't match expected size."""
# 3 bytes total, but expecting 2 frames of mono 16-bit (should be 4 bytes)
buffer = b"\x00\x01\x02"
with pytest.raises(ValueError) as exc_info:
self.check_audio_data(buffer, 2, 1)
# Should detect that 3 bytes / (2 frames * 1 channel) = 1.5 bytes per sample
# which gets truncated to 1 byte per sample = 8 bit
assert "16 bit PCM" in str(exc_info.value)
class TestColorspaceConversion:
"""Test cases for image colorspace conversion functions."""
def test_same_format_no_conversion(self) -> None:
"""Test that conversion with same source and target format returns original image."""
width, height = 4, 4
image_data = np.random.randint(0, 256, width * height * 3, dtype=np.uint8).tobytes()
# Test all formats with themselves
for fmt in ImageFormat:
result = image_colorspace_conversion(image_data, (width, height), fmt, fmt)
assert result == image_data
def test_rgb_to_bgr_conversion(self) -> None:
"""Test RGB to BGR conversion."""
width, height = 2, 2
# Create a simple RGB image with distinct colors
rgb_image = np.array(
[
[[255, 0, 0], [0, 255, 0]], # Red, Green
[[0, 0, 255], [255, 255, 0]], # Blue, Yellow
],
dtype=np.uint8,
)
result = image_colorspace_conversion(
rgb_image.tobytes(),
(width, height),
ImageFormat.RGB,
ImageFormat.BGR,
)
# Expected BGR: R and B channels swapped
expected = np.array(
[
[[0, 0, 255], [0, 255, 0]], # Blue, Green (unchanged)
[[255, 0, 0], [0, 255, 255]], # Red, Cyan
],
dtype=np.uint8,
)
assert result is not None
result_array = np.frombuffer(result, dtype=np.uint8).reshape(height, width, 3)
np.testing.assert_array_equal(result_array, expected)
def test_bgr_to_rgb_conversion(self) -> None:
"""Test BGR to RGB conversion (should be same as RGB to BGR)."""
width, height = 2, 2
bgr_image = np.array(
[
[[255, 0, 0], [0, 255, 0]],
[[0, 0, 255], [255, 255, 0]],
],
dtype=np.uint8,
)
result = image_colorspace_conversion(
bgr_image.tobytes(),
(width, height),
ImageFormat.BGR,
ImageFormat.RGB,
)
# R and B channels should be swapped
expected = np.array(
[
[[0, 0, 255], [0, 255, 0]],
[[255, 0, 0], [0, 255, 255]],
],
dtype=np.uint8,
)
assert result is not None
result_array = np.frombuffer(result, dtype=np.uint8).reshape(height, width, 3)
np.testing.assert_array_equal(result_array, expected)
def test_rgba_to_bgra_conversion(self) -> None:
"""Test RGBA to BGRA conversion."""
width, height = 2, 2
rgba_image = np.array(
[
[[255, 0, 0, 255], [0, 255, 0, 200]], # Red opaque, Green semi-transparent
[[0, 0, 255, 150], [255, 255, 0, 100]], # Blue, Yellow
],
dtype=np.uint8,
)
result = image_colorspace_conversion(
rgba_image.tobytes(),
(width, height),
ImageFormat.RGBA,
ImageFormat.BGRA,
)
# Expected: R and B swapped, alpha unchanged
expected = np.array(
[
[[0, 0, 255, 255], [0, 255, 0, 200]],
[[255, 0, 0, 150], [0, 255, 255, 100]],
],
dtype=np.uint8,
)
assert result is not None
result_array = np.frombuffer(result, dtype=np.uint8).reshape(height, width, 4)
np.testing.assert_array_equal(result_array, expected)
def test_bgra_to_rgba_conversion(self) -> None:
"""Test BGRA to RGBA conversion."""
width, height = 2, 2
bgra_image = np.array(
[
[[255, 0, 0, 255], [0, 255, 0, 200]],
[[0, 0, 255, 150], [255, 255, 0, 100]],
],
dtype=np.uint8,
)
result = image_colorspace_conversion(
bgra_image.tobytes(),
(width, height),
ImageFormat.BGRA,
ImageFormat.RGBA,
)
expected = np.array(
[
[[0, 0, 255, 255], [0, 255, 0, 200]],
[[255, 0, 0, 150], [0, 255, 255, 100]],
],
dtype=np.uint8,
)
assert result is not None
result_array = np.frombuffer(result, dtype=np.uint8).reshape(height, width, 4)
np.testing.assert_array_equal(result_array, expected)
def test_planar_yuv420_to_packed_yuv444_conversion(self) -> None:
"""Test planar YUV420 to packed YUV444 conversion."""
width, height = 4, 4
# Create YUV420 planar data
# Y plane: 4x4 = 16 bytes
y_plane = np.array(
[100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220, 230, 240, 250],
dtype=np.uint8,
)
# U plane: 2x2 = 4 bytes (subsampled)
u_plane = np.array([50, 60, 70, 80], dtype=np.uint8)
# V plane: 2x2 = 4 bytes (subsampled)
v_plane = np.array([90, 100, 110, 120], dtype=np.uint8)
yuv420_data = np.concatenate([y_plane, u_plane, v_plane])
result = image_colorspace_conversion(
yuv420_data.tobytes(),
(width, height),
ImageFormat.PLANAR_YUV420,
ImageFormat.PACKED_YUV444,
)
assert result is not None
result_array = np.frombuffer(result, dtype=np.uint8).reshape(height, width, 3)
# Check that Y plane values are preserved
assert result_array[0, 0, 0] == 100
assert result_array[0, 1, 0] == 110
assert result_array[3, 3, 0] == 250
# Check that U and V planes are upsampled (each 2x2 block should have same U/V values)
# Top-left 2x2 block should have U=50, V=90
assert result_array[0, 0, 1] == 50
assert result_array[0, 0, 2] == 90
assert result_array[0, 1, 1] == 50
assert result_array[0, 1, 2] == 90
assert result_array[1, 0, 1] == 50
assert result_array[1, 0, 2] == 90
assert result_array[1, 1, 1] == 50
assert result_array[1, 1, 2] == 90
# Top-right 2x2 block should have U=60, V=100
assert result_array[0, 2, 1] == 60
assert result_array[0, 2, 2] == 100
def test_packed_yuv444_to_planar_yuv420_conversion(self) -> None:
"""Test packed YUV444 to planar YUV420 conversion."""
width, height = 4, 4
# Create packed YUV444 data (interleaved YUVYUVYUV...)
# Each pixel has Y, U, V values
packed_yuv444 = np.zeros((height, width, 3), dtype=np.uint8)
# Set Y values
packed_yuv444[:, :, 0] = np.arange(100, 100 + width * height, dtype=np.uint8).reshape(
height, width
)
# Set U values (will be downsampled)
packed_yuv444[0:2, 0:2, 1] = 50 # Top-left block
packed_yuv444[0:2, 2:4, 1] = 60 # Top-right block
packed_yuv444[2:4, 0:2, 1] = 70 # Bottom-left block
packed_yuv444[2:4, 2:4, 1] = 80 # Bottom-right block
# Set V values (will be downsampled)
packed_yuv444[0:2, 0:2, 2] = 90
packed_yuv444[0:2, 2:4, 2] = 100
packed_yuv444[2:4, 0:2, 2] = 110
packed_yuv444[2:4, 2:4, 2] = 120
result = image_colorspace_conversion(
packed_yuv444.tobytes(),
(width, height),
ImageFormat.PACKED_YUV444,
ImageFormat.PLANAR_YUV420,
)
assert result is not None
# Parse the planar YUV420 result
y_plane_size = width * height
uv_plane_size = (width // 2) * (height // 2)
result_array = np.frombuffer(result, dtype=np.uint8)
y_result = result_array[:y_plane_size]
u_result = result_array[y_plane_size : y_plane_size + uv_plane_size]
v_result = result_array[y_plane_size + uv_plane_size :]
# Check Y plane is preserved
expected_y = np.arange(100, 100 + width * height, dtype=np.uint8)
np.testing.assert_array_equal(y_result, expected_y)
# Check U plane is downsampled (should be 2x2)
expected_u = np.array([50, 60, 70, 80], dtype=np.uint8)
np.testing.assert_array_equal(u_result, expected_u)
# Check V plane is downsampled
expected_v = np.array([90, 100, 110, 120], dtype=np.uint8)
np.testing.assert_array_equal(v_result, expected_v)
def test_unsupported_conversion_returns_none(self) -> None:
"""Test that unsupported conversions return None."""
width, height = 4, 4
image_data = np.random.randint(0, 256, width * height * 3, dtype=np.uint8).tobytes()
# Test some unsupported conversions
result = image_colorspace_conversion(
image_data,
(width, height),
ImageFormat.RGB,
ImageFormat.PLANAR_YUV420,
)
assert result is None
result = image_colorspace_conversion(
image_data,
(width, height),
ImageFormat.RGBA,
ImageFormat.RGB,
)
assert result is None
result = image_colorspace_conversion(
image_data,
(width, height),
ImageFormat.PLANAR_YUV420,
ImageFormat.BGR,
)
assert result is None
def test_conversion_with_different_sizes(self) -> None:
"""Test conversions work with different image sizes."""
test_sizes = [(2, 2), (4, 4), (8, 8), (16, 16)]
for width, height in test_sizes:
# Test RGB to BGR
rgb_image = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
result = image_colorspace_conversion(
rgb_image.tobytes(),
(width, height),
ImageFormat.RGB,
ImageFormat.BGR,
)
assert result is not None
assert len(result) == width * height * 3
def test_yuv420_to_yuv444_roundtrip_preserves_y_plane(self) -> None:
"""Test that Y plane is preserved in YUV420 -> YUV444 -> YUV420 conversion."""
width, height = 4, 4
# Create original YUV420 data
y_plane_orig = np.arange(0, width * height, dtype=np.uint8)
u_plane_orig = np.array([50, 60, 70, 80], dtype=np.uint8)
v_plane_orig = np.array([90, 100, 110, 120], dtype=np.uint8)
yuv420_orig = np.concatenate([y_plane_orig, u_plane_orig, v_plane_orig])
# Convert to YUV444
yuv444 = image_colorspace_conversion(
yuv420_orig.tobytes(),
(width, height),
ImageFormat.PLANAR_YUV420,
ImageFormat.PACKED_YUV444,
)
assert yuv444 is not None
# Convert back to YUV420
yuv420_result = image_colorspace_conversion(
yuv444,
(width, height),
ImageFormat.PACKED_YUV444,
ImageFormat.PLANAR_YUV420,
)
assert yuv420_result is not None
# Extract Y plane from result
result_array = np.frombuffer(yuv420_result, dtype=np.uint8)
y_plane_result = result_array[: width * height]
# Y plane should be identical after roundtrip
np.testing.assert_array_equal(y_plane_result, y_plane_orig)
def test_rgb_bgr_roundtrip(self) -> None:
"""Test that RGB -> BGR -> RGB conversion preserves data."""
width, height = 4, 4
rgb_orig = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
# Convert to BGR
bgr = image_colorspace_conversion(
rgb_orig.tobytes(),
(width, height),
ImageFormat.RGB,
ImageFormat.BGR,
)
assert bgr is not None
# Convert back to RGB
rgb_result = image_colorspace_conversion(
bgr,
(width, height),
ImageFormat.BGR,
ImageFormat.RGB,
)
assert rgb_result is not None
result_array = np.frombuffer(rgb_result, dtype=np.uint8).reshape(height, width, 3)
np.testing.assert_array_equal(result_array, rgb_orig)
def test_rgba_bgra_roundtrip(self) -> None:
"""Test that RGBA -> BGRA -> RGBA conversion preserves data."""
width, height = 4, 4
rgba_orig = np.random.randint(0, 256, (height, width, 4), dtype=np.uint8)
# Convert to BGRA
bgra = image_colorspace_conversion(
rgba_orig.tobytes(),
(width, height),
ImageFormat.RGBA,
ImageFormat.BGRA,
)
assert bgra is not None
# Convert back to RGBA
rgba_result = image_colorspace_conversion(
bgra,
(width, height),
ImageFormat.BGRA,
ImageFormat.RGBA,
)
assert rgba_result is not None
result_array = np.frombuffer(rgba_result, dtype=np.uint8).reshape(height, width, 4)
np.testing.assert_array_equal(result_array, rgba_orig)