Compare commits
1 Commits
hush/TurnT
...
cb/test-se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33d813ed8f |
@@ -0,0 +1,12 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024-2025 Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Public testing API for Pipecat frame processors."""
|
||||||
|
|
||||||
|
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||||
|
from .test_runner import run_test_from_file
|
||||||
|
|
||||||
|
__all__ = ["dict_to_frame", "frame_to_dict", "load_frames_from_json", "run_test_from_file"]
|
||||||
|
|||||||
150
src/pipecat/tests/serialization.py
Normal file
150
src/pipecat/tests/serialization.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024-2025 Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Frame serialization and deserialization for testing."""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import inspect
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
|
from pipecat.frames import frames
|
||||||
|
|
||||||
|
|
||||||
|
def _get_frame_class(frame_type: str):
|
||||||
|
"""Get a frame class by name from the frames module.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame_type: The name of the frame class (e.g., "TextFrame")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The frame class object
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the frame type is not found
|
||||||
|
"""
|
||||||
|
if not hasattr(frames, frame_type):
|
||||||
|
raise ValueError(f"Unknown frame type: {frame_type}")
|
||||||
|
|
||||||
|
cls = getattr(frames, frame_type)
|
||||||
|
if not inspect.isclass(cls) or not issubclass(cls, frames.Frame):
|
||||||
|
raise ValueError(f"{frame_type} is not a valid Frame class")
|
||||||
|
|
||||||
|
return cls
|
||||||
|
|
||||||
|
|
||||||
|
def dict_to_frame(data: Dict[str, Any]) -> frames.Frame:
|
||||||
|
"""Convert a dictionary to a Frame object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Dictionary containing frame data with a "type" key
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A Frame instance
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If frame type is missing or invalid
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> dict_to_frame({"type": "TextFrame", "text": "hello"})
|
||||||
|
TextFrame(text="hello")
|
||||||
|
"""
|
||||||
|
if "type" not in data:
|
||||||
|
raise ValueError("Frame dictionary must contain a 'type' field")
|
||||||
|
|
||||||
|
frame_type = data["type"]
|
||||||
|
frame_cls = _get_frame_class(frame_type)
|
||||||
|
|
||||||
|
# Build kwargs from data, excluding 'type'
|
||||||
|
kwargs = {k: v for k, v in data.items() if k != "type"}
|
||||||
|
|
||||||
|
# Special handling for audio frames with base64 encoded audio
|
||||||
|
if "audio" in kwargs and isinstance(kwargs["audio"], str):
|
||||||
|
kwargs["audio"] = base64.b64decode(kwargs["audio"])
|
||||||
|
|
||||||
|
# Special handling for image frames with base64 encoded images
|
||||||
|
if "image" in kwargs and isinstance(kwargs["image"], str):
|
||||||
|
kwargs["image"] = base64.b64decode(kwargs["image"])
|
||||||
|
|
||||||
|
try:
|
||||||
|
return frame_cls(**kwargs)
|
||||||
|
except TypeError as e:
|
||||||
|
raise ValueError(f"Failed to create {frame_type}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def load_frames_from_json(filepath: str) -> List[frames.Frame]:
|
||||||
|
"""Load frames from a JSON file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Path to JSON file containing frame data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Frame objects
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If the file doesn't exist
|
||||||
|
ValueError: If JSON is invalid or frames cannot be deserialized
|
||||||
|
|
||||||
|
Example JSON format:
|
||||||
|
{
|
||||||
|
"input_frames": [
|
||||||
|
{"type": "TextFrame", "text": "hello"},
|
||||||
|
{"type": "EndFrame"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
path = Path(filepath)
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"Frame file not found: {filepath}")
|
||||||
|
|
||||||
|
with open(path, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise ValueError("JSON must contain a dictionary")
|
||||||
|
|
||||||
|
if "input_frames" not in data:
|
||||||
|
raise ValueError("JSON must contain an 'input_frames' key")
|
||||||
|
|
||||||
|
frame_dicts = data["input_frames"]
|
||||||
|
if not isinstance(frame_dicts, list):
|
||||||
|
raise ValueError("'input_frames' must be a list")
|
||||||
|
|
||||||
|
return [dict_to_frame(frame_dict) for frame_dict in frame_dicts]
|
||||||
|
|
||||||
|
|
||||||
|
def frame_to_dict(frame: frames.Frame) -> Dict[str, Any]:
|
||||||
|
"""Convert a Frame object to a dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame: Frame object to serialize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary representation of the frame
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> frame_to_dict(TextFrame(text="hello"))
|
||||||
|
{"type": "TextFrame", "text": "hello"}
|
||||||
|
"""
|
||||||
|
result = {"type": frame.__class__.__name__}
|
||||||
|
|
||||||
|
# Get all fields from the dataclass
|
||||||
|
if hasattr(frame, "__dataclass_fields__"):
|
||||||
|
for field_name in frame.__dataclass_fields__:
|
||||||
|
# Skip internal fields from base Frame class
|
||||||
|
if field_name in ("id", "name", "pts", "metadata", "transport_source", "transport_destination"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
value = getattr(frame, field_name, None)
|
||||||
|
if value is not None:
|
||||||
|
# Special handling for bytes (audio/image data)
|
||||||
|
if isinstance(value, bytes):
|
||||||
|
result[field_name] = base64.b64encode(value).decode("utf-8")
|
||||||
|
else:
|
||||||
|
result[field_name] = value
|
||||||
|
|
||||||
|
return result
|
||||||
169
src/pipecat/tests/test_runner.py
Normal file
169
src/pipecat/tests/test_runner.py
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024-2025 Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Test runner for frame processors from JSON test files."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
from pipecat.frames.frames import Frame
|
||||||
|
from pipecat.processors.frame_processor import FrameProcessor
|
||||||
|
|
||||||
|
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
|
||||||
|
|
||||||
|
|
||||||
|
async def run_test_from_file(
|
||||||
|
processor: FrameProcessor,
|
||||||
|
test_file: str,
|
||||||
|
) -> Tuple[List[Frame], Optional[List[Dict[str, Any]]], bool]:
|
||||||
|
"""Run a processor test from a JSON test file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
processor: The frame processor to test
|
||||||
|
test_file: Path to JSON test file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (output_frames, expected_output, passed)
|
||||||
|
- output_frames: List of Frame objects that were output
|
||||||
|
- expected_output: List of expected frame dicts (None if not specified)
|
||||||
|
- passed: True if test passed, False if failed, None if no validation
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If test file doesn't exist
|
||||||
|
ValueError: If test file is invalid
|
||||||
|
|
||||||
|
Example test file format:
|
||||||
|
{
|
||||||
|
"input_frames": [
|
||||||
|
{"type": "TextFrame", "text": "hello"}
|
||||||
|
],
|
||||||
|
"expected_output": [
|
||||||
|
{"type": "TextFrame"},
|
||||||
|
{"type": "EndFrame"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
path = Path(test_file)
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"Test file not found: {test_file}")
|
||||||
|
|
||||||
|
with open(path, "r") as f:
|
||||||
|
test_data = json.load(f)
|
||||||
|
|
||||||
|
# Load input frames
|
||||||
|
if "input_frames" not in test_data:
|
||||||
|
raise ValueError("Test file must contain 'input_frames'")
|
||||||
|
|
||||||
|
input_frames = [dict_to_frame(frame_dict) for frame_dict in test_data["input_frames"]]
|
||||||
|
|
||||||
|
# Load expected output (optional)
|
||||||
|
expected_output = test_data.get("expected_output", None)
|
||||||
|
|
||||||
|
# Run the test
|
||||||
|
# Note: run_test() only collects frames if expected_down_frames is provided,
|
||||||
|
# so we need to manually collect from the pipeline ourselves
|
||||||
|
import asyncio
|
||||||
|
from pipecat.frames.frames import EndFrame
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection
|
||||||
|
from pipecat.tests.utils import QueuedFrameProcessor
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.pipeline.task import PipelineTask, PipelineParams
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
|
||||||
|
# Set up the test pipeline manually
|
||||||
|
received_down = asyncio.Queue()
|
||||||
|
received_up = asyncio.Queue()
|
||||||
|
source = QueuedFrameProcessor(
|
||||||
|
queue=received_up,
|
||||||
|
queue_direction=FrameDirection.UPSTREAM,
|
||||||
|
ignore_start=True,
|
||||||
|
)
|
||||||
|
sink = QueuedFrameProcessor(
|
||||||
|
queue=received_down,
|
||||||
|
queue_direction=FrameDirection.DOWNSTREAM,
|
||||||
|
ignore_start=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
pipeline = Pipeline([source, processor, sink])
|
||||||
|
task = PipelineTask(
|
||||||
|
pipeline,
|
||||||
|
params=PipelineParams(),
|
||||||
|
observers=[],
|
||||||
|
cancel_on_idle_timeout=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def push_frames():
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
for frame in input_frames:
|
||||||
|
await task.queue_frame(frame)
|
||||||
|
await task.queue_frame(EndFrame())
|
||||||
|
|
||||||
|
runner = PipelineRunner()
|
||||||
|
await asyncio.gather(runner.run(task), push_frames())
|
||||||
|
|
||||||
|
# Collect all frames from the downstream queue
|
||||||
|
downstream_frames = []
|
||||||
|
while not received_down.empty():
|
||||||
|
frame = await received_down.get()
|
||||||
|
if not isinstance(frame, EndFrame):
|
||||||
|
downstream_frames.append(frame)
|
||||||
|
|
||||||
|
# Validate if expected_output is provided
|
||||||
|
passed = None
|
||||||
|
if expected_output is not None:
|
||||||
|
passed = _validate_output(downstream_frames, expected_output)
|
||||||
|
|
||||||
|
return downstream_frames, expected_output, passed
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_output(actual_frames: List[Frame], expected_output: List[Dict[str, Any]]) -> bool:
|
||||||
|
"""Validate actual output frames against expected output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
actual_frames: List of frames that were actually output
|
||||||
|
expected_output: List of expected frame specifications
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if validation passed, False otherwise
|
||||||
|
"""
|
||||||
|
if len(actual_frames) != len(expected_output):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for actual, expected in zip(actual_frames, expected_output):
|
||||||
|
# Check frame type
|
||||||
|
if "type" not in expected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
expected_type = expected["type"]
|
||||||
|
if actual.__class__.__name__ != expected_type:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check specific fields if provided
|
||||||
|
for field_name, expected_value in expected.items():
|
||||||
|
if field_name == "type":
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not hasattr(actual, field_name):
|
||||||
|
return False
|
||||||
|
|
||||||
|
actual_value = getattr(actual, field_name)
|
||||||
|
|
||||||
|
# Special handling for different types
|
||||||
|
if isinstance(expected_value, str) and isinstance(actual_value, str):
|
||||||
|
# For string fields, support partial matching with "contains"
|
||||||
|
if field_name.endswith("_contains"):
|
||||||
|
base_field = field_name.replace("_contains", "")
|
||||||
|
if hasattr(actual, base_field):
|
||||||
|
actual_text = getattr(actual, base_field)
|
||||||
|
if expected_value not in actual_text:
|
||||||
|
return False
|
||||||
|
elif actual_value != expected_value:
|
||||||
|
return False
|
||||||
|
elif actual_value != expected_value:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
Reference in New Issue
Block a user