From dccd98ec8afac337dd76bbe5e28251ace7de4859 Mon Sep 17 00:00:00 2001 From: OmercohenAviv Date: Sat, 28 Mar 2026 11:53:51 +0300 Subject: [PATCH] test --- tests/test_pipeline.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3c7f50453..579b77b03 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -5,9 +5,12 @@ # import asyncio +import io import time import unittest +from loguru import logger + from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -298,6 +301,45 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): pass assert heartbeats_counter == expected_heartbeats + async def test_heartbeat_monitor_respects_custom_timeout(self): + """Verify the heartbeat monitor uses heartbeats_monitor_secs from params.""" + + class HeartbeatBlocker(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if not isinstance(frame, HeartbeatFrame): + await self.push_frame(frame, direction) + + log_output = io.StringIO() + handler_id = logger.add(log_output, level="WARNING", format="{message}") + + custom_monitor_secs = 0.3 + + try: + pipeline = Pipeline([HeartbeatBlocker()]) + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_heartbeats=True, + heartbeats_period_secs=0.1, + heartbeats_monitor_secs=custom_monitor_secs, + ), + cancel_on_idle_timeout=False, + ) + + try: + await asyncio.wait_for( + task.run(PipelineTaskParams(loop=asyncio.get_event_loop())), + timeout=0.6, + ) + except asyncio.TimeoutError: + pass + + log_text = log_output.getvalue() + assert f"more than {custom_monitor_secs} seconds" in log_text + finally: + logger.remove(handler_id) + async def test_idle_task(self): identity = IdentityFilter() pipeline = Pipeline([identity])