replace background task with flush-on-join
This commit is contained in:
@@ -501,8 +501,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._event_task = None
|
||||
self._audio_task = None
|
||||
self._video_task = None
|
||||
self._message_queue: Optional[asyncio.Queue] = None
|
||||
self._message_task = None
|
||||
self._join_message_queue: list = []
|
||||
|
||||
# Input and ouput sample rates. They will be initialize on setup().
|
||||
self._in_sample_rate = 0
|
||||
@@ -569,8 +568,7 @@ class DailyTransportClient(EventHandler):
|
||||
error: An error description or None.
|
||||
"""
|
||||
if not self._joined:
|
||||
if self._message_queue:
|
||||
await self._message_queue.put((frame,))
|
||||
self._join_message_queue.append(frame)
|
||||
return None
|
||||
|
||||
participant_id = None
|
||||
@@ -677,12 +675,6 @@ class DailyTransportClient(EventHandler):
|
||||
f"{self}::event_callback_task",
|
||||
)
|
||||
|
||||
self._message_queue = asyncio.Queue()
|
||||
self._message_task = self._task_manager.create_task(
|
||||
self._message_task_handler(self._message_queue),
|
||||
f"{self}::message_task",
|
||||
)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup client resources and cancel tasks."""
|
||||
if self._event_task and self._task_manager:
|
||||
@@ -694,9 +686,6 @@ class DailyTransportClient(EventHandler):
|
||||
if self._video_task and self._task_manager:
|
||||
await self._task_manager.cancel_task(self._video_task)
|
||||
self._video_task = None
|
||||
if self._message_task and self._task_manager:
|
||||
await self._task_manager.cancel_task(self._message_task)
|
||||
self._message_task = None
|
||||
# Make sure we don't block the event loop in case `client.release()`
|
||||
# takes extra time.
|
||||
await self._get_event_loop().run_in_executor(self._executor, self._cleanup)
|
||||
@@ -781,6 +770,8 @@ class DailyTransportClient(EventHandler):
|
||||
await self._callbacks.on_joined(data)
|
||||
|
||||
self._joined_event.set()
|
||||
|
||||
await self._flush_join_messages()
|
||||
else:
|
||||
error_msg = f"Error joining {self._room_url}: {error}"
|
||||
logger.error(error_msg)
|
||||
@@ -1554,13 +1545,11 @@ class DailyTransportClient(EventHandler):
|
||||
await callback(*args)
|
||||
queue.task_done()
|
||||
|
||||
async def _message_task_handler(self, queue: asyncio.Queue):
|
||||
"""Handle queued messages after transport is joined."""
|
||||
while True:
|
||||
await self._joined_event.wait()
|
||||
(frame,) = await queue.get()
|
||||
async def _flush_join_messages(self):
|
||||
"""Send any messages that were queued before join completed."""
|
||||
for frame in self._join_message_queue:
|
||||
await self.send_message(frame)
|
||||
queue.task_done()
|
||||
self._join_message_queue.clear()
|
||||
|
||||
def _get_event_loop(self) -> asyncio.AbstractEventLoop:
|
||||
"""Get the event loop from the task manager."""
|
||||
|
||||
Reference in New Issue
Block a user