diff --git a/changelog/3400.fixed.md b/changelog/3400.fixed.md new file mode 100644 index 000000000..aaf881bb7 --- /dev/null +++ b/changelog/3400.fixed.md @@ -0,0 +1 @@ +- Fixed timing issue in `BaseOutputTransport` where the bot speaking flag was set after awaiting, allowing the event loop to re-enter the method before the guard was set. diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 1d90cbff5..a19d65b75 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -603,6 +603,8 @@ class BaseOutputTransport(FrameProcessor): if self._bot_speaking: return + self._bot_speaking = True + logger.debug( f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking" ) @@ -614,13 +616,17 @@ class BaseOutputTransport(FrameProcessor): await self._transport.push_frame(downstream_frame) await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM) - self._bot_speaking = True - async def _bot_stopped_speaking(self): """Handle bot stopped speaking event.""" if not self._bot_speaking: return + self._bot_speaking = False + + # Clean audio buffer (there could be tiny left overs if not multiple + # to our output chunk size). + self._audio_buffer = bytearray() + logger.debug( f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking" ) @@ -632,12 +638,6 @@ class BaseOutputTransport(FrameProcessor): await self._transport.push_frame(downstream_frame) await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM) - self._bot_speaking = False - - # Clean audio buffer (there could be tiny left overs if not multiple - # to our output chunk size). - self._audio_buffer = bytearray() - async def _bot_currently_speaking(self): """Handle bot speaking event.""" await self._bot_started_speaking()