diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 37ae8a167..a955b1c12 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -501,8 +501,7 @@ class DailyTransportClient(EventHandler): self._event_task = None self._audio_task = None self._video_task = None - self._message_queue: Optional[asyncio.Queue] = None - self._message_task = None + self._join_message_queue: list = [] # Input and ouput sample rates. They will be initialize on setup(). self._in_sample_rate = 0 @@ -569,8 +568,7 @@ class DailyTransportClient(EventHandler): error: An error description or None. """ if not self._joined: - if self._message_queue: - await self._message_queue.put((frame,)) + self._join_message_queue.append(frame) return None participant_id = None @@ -677,12 +675,6 @@ class DailyTransportClient(EventHandler): f"{self}::event_callback_task", ) - self._message_queue = asyncio.Queue() - self._message_task = self._task_manager.create_task( - self._message_task_handler(self._message_queue), - f"{self}::message_task", - ) - async def cleanup(self): """Cleanup client resources and cancel tasks.""" if self._event_task and self._task_manager: @@ -694,9 +686,6 @@ class DailyTransportClient(EventHandler): if self._video_task and self._task_manager: await self._task_manager.cancel_task(self._video_task) self._video_task = None - if self._message_task and self._task_manager: - await self._task_manager.cancel_task(self._message_task) - self._message_task = None # Make sure we don't block the event loop in case `client.release()` # takes extra time. await self._get_event_loop().run_in_executor(self._executor, self._cleanup) @@ -781,6 +770,8 @@ class DailyTransportClient(EventHandler): await self._callbacks.on_joined(data) self._joined_event.set() + + await self._flush_join_messages() else: error_msg = f"Error joining {self._room_url}: {error}" logger.error(error_msg) @@ -1554,13 +1545,11 @@ class DailyTransportClient(EventHandler): await callback(*args) queue.task_done() - async def _message_task_handler(self, queue: asyncio.Queue): - """Handle queued messages after transport is joined.""" - while True: - await self._joined_event.wait() - (frame,) = await queue.get() + async def _flush_join_messages(self): + """Send any messages that were queued before join completed.""" + for frame in self._join_message_queue: await self.send_message(frame) - queue.task_done() + self._join_message_queue.clear() def _get_event_loop(self) -> asyncio.AbstractEventLoop: """Get the event loop from the task manager."""