Compare commits

...

1 Commits

Author SHA1 Message Date
Chad Bailey
33d813ed8f added JSON serializtion and additional testing utilities 2025-10-27 15:57:24 +00:00
3 changed files with 331 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Public testing API for Pipecat frame processors."""
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
from .test_runner import run_test_from_file
__all__ = ["dict_to_frame", "frame_to_dict", "load_frames_from_json", "run_test_from_file"]

View File

@@ -0,0 +1,150 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Frame serialization and deserialization for testing."""
import base64
import inspect
import json
from pathlib import Path
from typing import Any, Dict, List
from pipecat.frames import frames
def _get_frame_class(frame_type: str):
"""Get a frame class by name from the frames module.
Args:
frame_type: The name of the frame class (e.g., "TextFrame")
Returns:
The frame class object
Raises:
ValueError: If the frame type is not found
"""
if not hasattr(frames, frame_type):
raise ValueError(f"Unknown frame type: {frame_type}")
cls = getattr(frames, frame_type)
if not inspect.isclass(cls) or not issubclass(cls, frames.Frame):
raise ValueError(f"{frame_type} is not a valid Frame class")
return cls
def dict_to_frame(data: Dict[str, Any]) -> frames.Frame:
"""Convert a dictionary to a Frame object.
Args:
data: Dictionary containing frame data with a "type" key
Returns:
A Frame instance
Raises:
ValueError: If frame type is missing or invalid
Example:
>>> dict_to_frame({"type": "TextFrame", "text": "hello"})
TextFrame(text="hello")
"""
if "type" not in data:
raise ValueError("Frame dictionary must contain a 'type' field")
frame_type = data["type"]
frame_cls = _get_frame_class(frame_type)
# Build kwargs from data, excluding 'type'
kwargs = {k: v for k, v in data.items() if k != "type"}
# Special handling for audio frames with base64 encoded audio
if "audio" in kwargs and isinstance(kwargs["audio"], str):
kwargs["audio"] = base64.b64decode(kwargs["audio"])
# Special handling for image frames with base64 encoded images
if "image" in kwargs and isinstance(kwargs["image"], str):
kwargs["image"] = base64.b64decode(kwargs["image"])
try:
return frame_cls(**kwargs)
except TypeError as e:
raise ValueError(f"Failed to create {frame_type}: {e}")
def load_frames_from_json(filepath: str) -> List[frames.Frame]:
"""Load frames from a JSON file.
Args:
filepath: Path to JSON file containing frame data
Returns:
List of Frame objects
Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If JSON is invalid or frames cannot be deserialized
Example JSON format:
{
"input_frames": [
{"type": "TextFrame", "text": "hello"},
{"type": "EndFrame"}
]
}
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Frame file not found: {filepath}")
with open(path, "r") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError("JSON must contain a dictionary")
if "input_frames" not in data:
raise ValueError("JSON must contain an 'input_frames' key")
frame_dicts = data["input_frames"]
if not isinstance(frame_dicts, list):
raise ValueError("'input_frames' must be a list")
return [dict_to_frame(frame_dict) for frame_dict in frame_dicts]
def frame_to_dict(frame: frames.Frame) -> Dict[str, Any]:
"""Convert a Frame object to a dictionary.
Args:
frame: Frame object to serialize
Returns:
Dictionary representation of the frame
Example:
>>> frame_to_dict(TextFrame(text="hello"))
{"type": "TextFrame", "text": "hello"}
"""
result = {"type": frame.__class__.__name__}
# Get all fields from the dataclass
if hasattr(frame, "__dataclass_fields__"):
for field_name in frame.__dataclass_fields__:
# Skip internal fields from base Frame class
if field_name in ("id", "name", "pts", "metadata", "transport_source", "transport_destination"):
continue
value = getattr(frame, field_name, None)
if value is not None:
# Special handling for bytes (audio/image data)
if isinstance(value, bytes):
result[field_name] = base64.b64encode(value).decode("utf-8")
else:
result[field_name] = value
return result

View File

@@ -0,0 +1,169 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Test runner for frame processors from JSON test files."""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameProcessor
from .serialization import dict_to_frame, frame_to_dict, load_frames_from_json
async def run_test_from_file(
processor: FrameProcessor,
test_file: str,
) -> Tuple[List[Frame], Optional[List[Dict[str, Any]]], bool]:
"""Run a processor test from a JSON test file.
Args:
processor: The frame processor to test
test_file: Path to JSON test file
Returns:
Tuple of (output_frames, expected_output, passed)
- output_frames: List of Frame objects that were output
- expected_output: List of expected frame dicts (None if not specified)
- passed: True if test passed, False if failed, None if no validation
Raises:
FileNotFoundError: If test file doesn't exist
ValueError: If test file is invalid
Example test file format:
{
"input_frames": [
{"type": "TextFrame", "text": "hello"}
],
"expected_output": [
{"type": "TextFrame"},
{"type": "EndFrame"}
]
}
"""
path = Path(test_file)
if not path.exists():
raise FileNotFoundError(f"Test file not found: {test_file}")
with open(path, "r") as f:
test_data = json.load(f)
# Load input frames
if "input_frames" not in test_data:
raise ValueError("Test file must contain 'input_frames'")
input_frames = [dict_to_frame(frame_dict) for frame_dict in test_data["input_frames"]]
# Load expected output (optional)
expected_output = test_data.get("expected_output", None)
# Run the test
# Note: run_test() only collects frames if expected_down_frames is provided,
# so we need to manually collect from the pipeline ourselves
import asyncio
from pipecat.frames.frames import EndFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.tests.utils import QueuedFrameProcessor
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
# Set up the test pipeline manually
received_down = asyncio.Queue()
received_up = asyncio.Queue()
source = QueuedFrameProcessor(
queue=received_up,
queue_direction=FrameDirection.UPSTREAM,
ignore_start=True,
)
sink = QueuedFrameProcessor(
queue=received_down,
queue_direction=FrameDirection.DOWNSTREAM,
ignore_start=True,
)
pipeline = Pipeline([source, processor, sink])
task = PipelineTask(
pipeline,
params=PipelineParams(),
observers=[],
cancel_on_idle_timeout=False,
)
async def push_frames():
await asyncio.sleep(0.01)
for frame in input_frames:
await task.queue_frame(frame)
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await asyncio.gather(runner.run(task), push_frames())
# Collect all frames from the downstream queue
downstream_frames = []
while not received_down.empty():
frame = await received_down.get()
if not isinstance(frame, EndFrame):
downstream_frames.append(frame)
# Validate if expected_output is provided
passed = None
if expected_output is not None:
passed = _validate_output(downstream_frames, expected_output)
return downstream_frames, expected_output, passed
def _validate_output(actual_frames: List[Frame], expected_output: List[Dict[str, Any]]) -> bool:
"""Validate actual output frames against expected output.
Args:
actual_frames: List of frames that were actually output
expected_output: List of expected frame specifications
Returns:
True if validation passed, False otherwise
"""
if len(actual_frames) != len(expected_output):
return False
for actual, expected in zip(actual_frames, expected_output):
# Check frame type
if "type" not in expected:
return False
expected_type = expected["type"]
if actual.__class__.__name__ != expected_type:
return False
# Check specific fields if provided
for field_name, expected_value in expected.items():
if field_name == "type":
continue
if not hasattr(actual, field_name):
return False
actual_value = getattr(actual, field_name)
# Special handling for different types
if isinstance(expected_value, str) and isinstance(actual_value, str):
# For string fields, support partial matching with "contains"
if field_name.endswith("_contains"):
base_field = field_name.replace("_contains", "")
if hasattr(actual, base_field):
actual_text = getattr(actual, base_field)
if expected_value not in actual_text:
return False
elif actual_value != expected_value:
return False
elif actual_value != expected_value:
return False
return True