tests: add a new SleepFrame

The new SleepFrame allow us to control when system frames are pushed to the
pipeline.
This commit is contained in:
Aleix Conchillo Flaqué
2025-02-11 22:13:07 -08:00
parent d1ee851a65
commit 1b7dfe8126

View File

@@ -5,6 +5,7 @@
#
import asyncio
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, Sequence, Tuple
from pipecat.frames.frames import (
@@ -12,6 +13,7 @@ from pipecat.frames.frames import (
Frame,
HeartbeatFrame,
StartFrame,
SystemFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
@@ -20,6 +22,15 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class SleepFrame(SystemFrame):
"""This frame is used by test framework to introduce some sleep time before
the next frame is pushed. This is useful to control system frames vs data or
control frames."""
sleep: float = 0.1
class HeartbeatsObserver(BaseObserver):
def __init__(
self,
@@ -44,7 +55,11 @@ class HeartbeatsObserver(BaseObserver):
class QueuedFrameProcessor(FrameProcessor):
def __init__(
self, queue: asyncio.Queue, queue_direction: FrameDirection, ignore_start: bool = True
self,
*,
queue: asyncio.Queue,
queue_direction: FrameDirection,
ignore_start: bool = True,
):
super().__init__()
self._queue = queue
@@ -72,21 +87,35 @@ async def run_test(
) -> Tuple[Sequence[Frame], Sequence[Frame]]:
received_up = asyncio.Queue()
received_down = asyncio.Queue()
source = QueuedFrameProcessor(received_up, FrameDirection.UPSTREAM, ignore_start)
sink = QueuedFrameProcessor(received_down, FrameDirection.DOWNSTREAM, ignore_start)
source = QueuedFrameProcessor(
queue=received_up,
queue_direction=FrameDirection.UPSTREAM,
ignore_start=ignore_start,
)
sink = QueuedFrameProcessor(
queue=received_down,
queue_direction=FrameDirection.DOWNSTREAM,
ignore_start=ignore_start,
)
pipeline = Pipeline([source, processor, sink])
task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata))
for frame in frames_to_send:
await task.queue_frame(frame)
async def push_frames():
# Just give a little head start to the runner.
await asyncio.sleep(0.01)
for frame in frames_to_send:
if isinstance(frame, SleepFrame):
await asyncio.sleep(frame.sleep)
else:
await task.queue_frame(frame)
if send_end_frame:
await task.queue_frame(EndFrame())
if send_end_frame:
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
await asyncio.gather(runner.run(task), push_frames())
#
# Down frames