From 1b7dfe81260aa220749a0cef94fe1f7fe3be71ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 11 Feb 2025 22:13:07 -0800 Subject: [PATCH] tests: add a new SleepFrame The new SleepFrame allow us to control when system frames are pushed to the pipeline. --- src/pipecat/tests/utils.py | 45 +++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/pipecat/tests/utils.py b/src/pipecat/tests/utils.py index ff92164a4..2b78f2bef 100644 --- a/src/pipecat/tests/utils.py +++ b/src/pipecat/tests/utils.py @@ -5,6 +5,7 @@ # import asyncio +from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, Sequence, Tuple from pipecat.frames.frames import ( @@ -12,6 +13,7 @@ from pipecat.frames.frames import ( Frame, HeartbeatFrame, StartFrame, + SystemFrame, ) from pipecat.observers.base_observer import BaseObserver from pipecat.pipeline.pipeline import Pipeline @@ -20,6 +22,15 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +@dataclass +class SleepFrame(SystemFrame): + """This frame is used by test framework to introduce some sleep time before + the next frame is pushed. This is useful to control system frames vs data or + control frames.""" + + sleep: float = 0.1 + + class HeartbeatsObserver(BaseObserver): def __init__( self, @@ -44,7 +55,11 @@ class HeartbeatsObserver(BaseObserver): class QueuedFrameProcessor(FrameProcessor): def __init__( - self, queue: asyncio.Queue, queue_direction: FrameDirection, ignore_start: bool = True + self, + *, + queue: asyncio.Queue, + queue_direction: FrameDirection, + ignore_start: bool = True, ): super().__init__() self._queue = queue @@ -72,21 +87,35 @@ async def run_test( ) -> Tuple[Sequence[Frame], Sequence[Frame]]: received_up = asyncio.Queue() received_down = asyncio.Queue() - source = QueuedFrameProcessor(received_up, FrameDirection.UPSTREAM, ignore_start) - sink = QueuedFrameProcessor(received_down, FrameDirection.DOWNSTREAM, ignore_start) + source = QueuedFrameProcessor( + queue=received_up, + queue_direction=FrameDirection.UPSTREAM, + ignore_start=ignore_start, + ) + sink = QueuedFrameProcessor( + queue=received_down, + queue_direction=FrameDirection.DOWNSTREAM, + ignore_start=ignore_start, + ) pipeline = Pipeline([source, processor, sink]) task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata)) - for frame in frames_to_send: - await task.queue_frame(frame) + async def push_frames(): + # Just give a little head start to the runner. + await asyncio.sleep(0.01) + for frame in frames_to_send: + if isinstance(frame, SleepFrame): + await asyncio.sleep(frame.sleep) + else: + await task.queue_frame(frame) - if send_end_frame: - await task.queue_frame(EndFrame()) + if send_end_frame: + await task.queue_frame(EndFrame()) runner = PipelineRunner() - await runner.run(task) + await asyncio.gather(runner.run(task), push_frames()) # # Down frames