Compare commits
1 Commits
mb/fix-pip
...
cb/test-se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33d813ed8f |
@@ -0,0 +1,12 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Public testing API for Pipecat frame processors."""
|
||||
|
||||
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||
from .test_runner import run_test_from_file
|
||||
|
||||
__all__ = ["dict_to_frame", "frame_to_dict", "load_frames_from_json", "run_test_from_file"]
|
||||
|
||||
150
src/pipecat/tests/serialization.py
Normal file
150
src/pipecat/tests/serialization.py
Normal file
@@ -0,0 +1,150 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Frame serialization and deserialization for testing."""
|
||||
|
||||
import base64
|
||||
import inspect
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pipecat.frames import frames
|
||||
|
||||
|
||||
def _get_frame_class(frame_type: str):
|
||||
"""Get a frame class by name from the frames module.
|
||||
|
||||
Args:
|
||||
frame_type: The name of the frame class (e.g., "TextFrame")
|
||||
|
||||
Returns:
|
||||
The frame class object
|
||||
|
||||
Raises:
|
||||
ValueError: If the frame type is not found
|
||||
"""
|
||||
if not hasattr(frames, frame_type):
|
||||
raise ValueError(f"Unknown frame type: {frame_type}")
|
||||
|
||||
cls = getattr(frames, frame_type)
|
||||
if not inspect.isclass(cls) or not issubclass(cls, frames.Frame):
|
||||
raise ValueError(f"{frame_type} is not a valid Frame class")
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
def dict_to_frame(data: Dict[str, Any]) -> frames.Frame:
|
||||
"""Convert a dictionary to a Frame object.
|
||||
|
||||
Args:
|
||||
data: Dictionary containing frame data with a "type" key
|
||||
|
||||
Returns:
|
||||
A Frame instance
|
||||
|
||||
Raises:
|
||||
ValueError: If frame type is missing or invalid
|
||||
|
||||
Example:
|
||||
>>> dict_to_frame({"type": "TextFrame", "text": "hello"})
|
||||
TextFrame(text="hello")
|
||||
"""
|
||||
if "type" not in data:
|
||||
raise ValueError("Frame dictionary must contain a 'type' field")
|
||||
|
||||
frame_type = data["type"]
|
||||
frame_cls = _get_frame_class(frame_type)
|
||||
|
||||
# Build kwargs from data, excluding 'type'
|
||||
kwargs = {k: v for k, v in data.items() if k != "type"}
|
||||
|
||||
# Special handling for audio frames with base64 encoded audio
|
||||
if "audio" in kwargs and isinstance(kwargs["audio"], str):
|
||||
kwargs["audio"] = base64.b64decode(kwargs["audio"])
|
||||
|
||||
# Special handling for image frames with base64 encoded images
|
||||
if "image" in kwargs and isinstance(kwargs["image"], str):
|
||||
kwargs["image"] = base64.b64decode(kwargs["image"])
|
||||
|
||||
try:
|
||||
return frame_cls(**kwargs)
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Failed to create {frame_type}: {e}")
|
||||
|
||||
|
||||
def load_frames_from_json(filepath: str) -> List[frames.Frame]:
|
||||
"""Load frames from a JSON file.
|
||||
|
||||
Args:
|
||||
filepath: Path to JSON file containing frame data
|
||||
|
||||
Returns:
|
||||
List of Frame objects
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file doesn't exist
|
||||
ValueError: If JSON is invalid or frames cannot be deserialized
|
||||
|
||||
Example JSON format:
|
||||
{
|
||||
"input_frames": [
|
||||
{"type": "TextFrame", "text": "hello"},
|
||||
{"type": "EndFrame"}
|
||||
]
|
||||
}
|
||||
"""
|
||||
path = Path(filepath)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Frame file not found: {filepath}")
|
||||
|
||||
with open(path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("JSON must contain a dictionary")
|
||||
|
||||
if "input_frames" not in data:
|
||||
raise ValueError("JSON must contain an 'input_frames' key")
|
||||
|
||||
frame_dicts = data["input_frames"]
|
||||
if not isinstance(frame_dicts, list):
|
||||
raise ValueError("'input_frames' must be a list")
|
||||
|
||||
return [dict_to_frame(frame_dict) for frame_dict in frame_dicts]
|
||||
|
||||
|
||||
def frame_to_dict(frame: frames.Frame) -> Dict[str, Any]:
|
||||
"""Convert a Frame object to a dictionary.
|
||||
|
||||
Args:
|
||||
frame: Frame object to serialize
|
||||
|
||||
Returns:
|
||||
Dictionary representation of the frame
|
||||
|
||||
Example:
|
||||
>>> frame_to_dict(TextFrame(text="hello"))
|
||||
{"type": "TextFrame", "text": "hello"}
|
||||
"""
|
||||
result = {"type": frame.__class__.__name__}
|
||||
|
||||
# Get all fields from the dataclass
|
||||
if hasattr(frame, "__dataclass_fields__"):
|
||||
for field_name in frame.__dataclass_fields__:
|
||||
# Skip internal fields from base Frame class
|
||||
if field_name in ("id", "name", "pts", "metadata", "transport_source", "transport_destination"):
|
||||
continue
|
||||
|
||||
value = getattr(frame, field_name, None)
|
||||
if value is not None:
|
||||
# Special handling for bytes (audio/image data)
|
||||
if isinstance(value, bytes):
|
||||
result[field_name] = base64.b64encode(value).decode("utf-8")
|
||||
else:
|
||||
result[field_name] = value
|
||||
|
||||
return result
|
||||
169
src/pipecat/tests/test_runner.py
Normal file
169
src/pipecat/tests/test_runner.py
Normal file
@@ -0,0 +1,169 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Test runner for frame processors from JSON test files."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
|
||||
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||
|
||||
|
||||
async def run_test_from_file(
|
||||
processor: FrameProcessor,
|
||||
test_file: str,
|
||||
) -> Tuple[List[Frame], Optional[List[Dict[str, Any]]], bool]:
|
||||
"""Run a processor test from a JSON test file.
|
||||
|
||||
Args:
|
||||
processor: The frame processor to test
|
||||
test_file: Path to JSON test file
|
||||
|
||||
Returns:
|
||||
Tuple of (output_frames, expected_output, passed)
|
||||
- output_frames: List of Frame objects that were output
|
||||
- expected_output: List of expected frame dicts (None if not specified)
|
||||
- passed: True if test passed, False if failed, None if no validation
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If test file doesn't exist
|
||||
ValueError: If test file is invalid
|
||||
|
||||
Example test file format:
|
||||
{
|
||||
"input_frames": [
|
||||
{"type": "TextFrame", "text": "hello"}
|
||||
],
|
||||
"expected_output": [
|
||||
{"type": "TextFrame"},
|
||||
{"type": "EndFrame"}
|
||||
]
|
||||
}
|
||||
"""
|
||||
path = Path(test_file)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Test file not found: {test_file}")
|
||||
|
||||
with open(path, "r") as f:
|
||||
test_data = json.load(f)
|
||||
|
||||
# Load input frames
|
||||
if "input_frames" not in test_data:
|
||||
raise ValueError("Test file must contain 'input_frames'")
|
||||
|
||||
input_frames = [dict_to_frame(frame_dict) for frame_dict in test_data["input_frames"]]
|
||||
|
||||
# Load expected output (optional)
|
||||
expected_output = test_data.get("expected_output", None)
|
||||
|
||||
# Run the test
|
||||
# Note: run_test() only collects frames if expected_down_frames is provided,
|
||||
# so we need to manually collect from the pipeline ourselves
|
||||
import asyncio
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.tests.utils import QueuedFrameProcessor
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.task import PipelineTask, PipelineParams
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
|
||||
# Set up the test pipeline manually
|
||||
received_down = asyncio.Queue()
|
||||
received_up = asyncio.Queue()
|
||||
source = QueuedFrameProcessor(
|
||||
queue=received_up,
|
||||
queue_direction=FrameDirection.UPSTREAM,
|
||||
ignore_start=True,
|
||||
)
|
||||
sink = QueuedFrameProcessor(
|
||||
queue=received_down,
|
||||
queue_direction=FrameDirection.DOWNSTREAM,
|
||||
ignore_start=True,
|
||||
)
|
||||
|
||||
pipeline = Pipeline([source, processor, sink])
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(),
|
||||
observers=[],
|
||||
cancel_on_idle_timeout=False,
|
||||
)
|
||||
|
||||
async def push_frames():
|
||||
await asyncio.sleep(0.01)
|
||||
for frame in input_frames:
|
||||
await task.queue_frame(frame)
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
await asyncio.gather(runner.run(task), push_frames())
|
||||
|
||||
# Collect all frames from the downstream queue
|
||||
downstream_frames = []
|
||||
while not received_down.empty():
|
||||
frame = await received_down.get()
|
||||
if not isinstance(frame, EndFrame):
|
||||
downstream_frames.append(frame)
|
||||
|
||||
# Validate if expected_output is provided
|
||||
passed = None
|
||||
if expected_output is not None:
|
||||
passed = _validate_output(downstream_frames, expected_output)
|
||||
|
||||
return downstream_frames, expected_output, passed
|
||||
|
||||
|
||||
def _validate_output(actual_frames: List[Frame], expected_output: List[Dict[str, Any]]) -> bool:
|
||||
"""Validate actual output frames against expected output.
|
||||
|
||||
Args:
|
||||
actual_frames: List of frames that were actually output
|
||||
expected_output: List of expected frame specifications
|
||||
|
||||
Returns:
|
||||
True if validation passed, False otherwise
|
||||
"""
|
||||
if len(actual_frames) != len(expected_output):
|
||||
return False
|
||||
|
||||
for actual, expected in zip(actual_frames, expected_output):
|
||||
# Check frame type
|
||||
if "type" not in expected:
|
||||
return False
|
||||
|
||||
expected_type = expected["type"]
|
||||
if actual.__class__.__name__ != expected_type:
|
||||
return False
|
||||
|
||||
# Check specific fields if provided
|
||||
for field_name, expected_value in expected.items():
|
||||
if field_name == "type":
|
||||
continue
|
||||
|
||||
if not hasattr(actual, field_name):
|
||||
return False
|
||||
|
||||
actual_value = getattr(actual, field_name)
|
||||
|
||||
# Special handling for different types
|
||||
if isinstance(expected_value, str) and isinstance(actual_value, str):
|
||||
# For string fields, support partial matching with "contains"
|
||||
if field_name.endswith("_contains"):
|
||||
base_field = field_name.replace("_contains", "")
|
||||
if hasattr(actual, base_field):
|
||||
actual_text = getattr(actual, base_field)
|
||||
if expected_value not in actual_text:
|
||||
return False
|
||||
elif actual_value != expected_value:
|
||||
return False
|
||||
elif actual_value != expected_value:
|
||||
return False
|
||||
|
||||
return True
|
||||
Reference in New Issue
Block a user