diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 3f1427627..dbb25fd28 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -448,6 +448,9 @@ class BaseOutputTransport(FrameProcessor): self._video_task: asyncio.Task | None = None self._clock_task: asyncio.Task | None = None + # If timestamps are equal, use this count to preserve the insertion order + self._clock_queue_counter = itertools.count() + @property def sample_rate(self) -> int: """Get the audio sample rate. @@ -498,7 +501,7 @@ class BaseOutputTransport(FrameProcessor): frame: The end frame signaling sender shutdown. """ # Let the sink tasks process the queue until they reach this EndFrame. - await self._clock_queue.put((float("inf"), frame.id, frame)) + await self._clock_queue.put((float("inf"), next(self._clock_queue_counter), frame)) await self._audio_queue.put(frame) # At this point we have enqueued an EndFrame and we need to wait for @@ -610,7 +613,7 @@ class BaseOutputTransport(FrameProcessor): Args: frame: The frame with timing information to handle. """ - await self._clock_queue.put((frame.pts, frame.id, frame)) + await self._clock_queue.put((frame.pts, next(self._clock_queue_counter), frame)) async def handle_sync_frame(self, frame: Frame): """Handle frames that need synchronized processing.