From 62484a4fc366a13f8aae180e78be464a05dd688a Mon Sep 17 00:00:00 2001 From: ajmeraharsh Date: Thu, 26 Mar 2026 09:47:00 +0530 Subject: [PATCH] fix(livekit): clear AudioSource buffer on interruption When an InterruptionFrame arrives, the Python-side audio task is cancelled but frames already submitted to rtc.AudioSource continue playing from its internal buffer. This causes the bot to keep speaking for several seconds after being interrupted. Fix by overriding process_frame in LiveKitOutputTransport to call audio_source.clear_queue() on InterruptionFrame, immediately flushing the buffered audio. --- changelog/4151.fixed.md | 1 + src/pipecat/transports/livekit/transport.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 changelog/4151.fixed.md diff --git a/changelog/4151.fixed.md b/changelog/4151.fixed.md new file mode 100644 index 000000000..0bcd55194 --- /dev/null +++ b/changelog/4151.fixed.md @@ -0,0 +1 @@ +- Fixed `LiveKitOutputTransport` not clearing the `rtc.AudioSource` internal buffer on interruption, causing the bot to continue speaking for several seconds after being interrupted. diff --git a/src/pipecat/transports/livekit/transport.py b/src/pipecat/transports/livekit/transport.py index 18163cf8e..07a7e3f39 100644 --- a/src/pipecat/transports/livekit/transport.py +++ b/src/pipecat/transports/livekit/transport.py @@ -27,7 +27,9 @@ from pipecat.frames.frames import ( CancelFrame, ClientConnectedFrame, EndFrame, + Frame, ImageRawFrame, + InterruptionFrame, OutputAudioRawFrame, OutputDTMFFrame, OutputDTMFUrgentFrame, @@ -880,6 +882,21 @@ class LiveKitOutputTransport(BaseOutputTransport): await super().cancel(frame) await self._client.disconnect() + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process frames, clearing the LiveKit AudioSource buffer on interruption. + + When an InterruptionFrame arrives, any audio already submitted to the + LiveKit AudioSource (but not yet played out) is cleared immediately so + the bot stops speaking without delay. + + Args: + frame: The frame to process. + direction: The direction of frame flow in the pipeline. + """ + await super().process_frame(frame, direction) + if isinstance(frame, InterruptionFrame) and self._client._audio_source is not None: + self._client._audio_source.clear_queue() + async def setup(self, setup: FrameProcessorSetup): """Setup the output transport with shared client setup.