From 6b9deefbe3ff531097d79adca8e34a30dba43ec2 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 20 May 2026 10:03:08 -0300 Subject: [PATCH] fix: preserve frame insertion order in BaseOutputTransport for equal PTS values Frames sharing the same presentation timestamp were being reordered by the priority queue. Adds a monotonic counter as a tiebreaker so frames with equal PTS are always emitted in insertion order, preventing subtle audio/text sequencing bugs. --- src/pipecat/transports/base_output.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 3f1427627..dbb25fd28 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -448,6 +448,9 @@ class BaseOutputTransport(FrameProcessor): self._video_task: asyncio.Task | None = None self._clock_task: asyncio.Task | None = None + # If timestamps are equal, use this count to preserve the insertion order + self._clock_queue_counter = itertools.count() + @property def sample_rate(self) -> int: """Get the audio sample rate. @@ -498,7 +501,7 @@ class BaseOutputTransport(FrameProcessor): frame: The end frame signaling sender shutdown. """ # Let the sink tasks process the queue until they reach this EndFrame. - await self._clock_queue.put((float("inf"), frame.id, frame)) + await self._clock_queue.put((float("inf"), next(self._clock_queue_counter), frame)) await self._audio_queue.put(frame) # At this point we have enqueued an EndFrame and we need to wait for @@ -610,7 +613,7 @@ class BaseOutputTransport(FrameProcessor): Args: frame: The frame with timing information to handle. """ - await self._clock_queue.put((frame.pts, frame.id, frame)) + await self._clock_queue.put((frame.pts, next(self._clock_queue_counter), frame)) async def handle_sync_frame(self, frame: Frame): """Handle frames that need synchronized processing.