DailyTransport: process audio, video and event callbacks in separate tasks

This commit is contained in:
Aleix Conchillo Flaqué
2025-05-23 00:38:40 -07:00
parent 69ac70eed8
commit c3cfd1f0ce
2 changed files with 80 additions and 56 deletions

View File

@@ -136,6 +136,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Performance
- `DailyTransport`: process audio, video and events in separate tasks.
- Don't create event handler tasks if no user event handlers have been
registered.

View File

@@ -309,16 +309,21 @@ class DailyTransportClient(EventHandler):
self._client: CallClient = CallClient(event_handler=self)
# We use a separate task to execute the callbacks, otherwise if we call
# a `CallClient` function and wait for its completion this will
# currently result in a deadlock. This is because `_call_async_callback`
# can be used inside `CallClient` event handlers which are holding the
# GIL in `daily-python`. So if the `callback` passed here makes a
# `CallClient` call and waits for it to finish using completions (and a
# future) we will deadlock because completions use event handlers (which
# are holding the GIL).
self._callback_queue = asyncio.Queue()
self._callback_task = None
# We use separate tasks to execute callbacks (events, audio or
# video). In the case of events, if we call a `CallClient` function
# inside the callback and wait for its completion this will result in a
# deadlock (because we haven't exited the event callback). The deadlocks
# occur because `daily-python` is holding the GIL when calling the
# callbacks. So, if our callback handler makes a `CallClient` call and
# waits for it to finish using completions (and a future) we will
# deadlock because completions use event handlers (which are holding the
# GIL).
self._event_queue = asyncio.Queue()
self._audio_queue = asyncio.Queue()
self._video_queue = asyncio.Queue()
self._event_task = None
self._audio_task = None
self._video_task = None
# Input and ouput sample rates. They will be initialize on setup().
self._in_sample_rate = 0
@@ -394,15 +399,21 @@ class DailyTransportClient(EventHandler):
return
self._task_manager = setup.task_manager
self._callback_task = self._task_manager.create_task(
self._callback_task_handler(),
f"{self}::callback_task",
self._event_task = self._task_manager.create_task(
self._callback_task_handler(self._event_queue),
f"{self}::event_callback_task",
)
async def cleanup(self):
if self._callback_task and self._task_manager:
await self._task_manager.cancel_task(self._callback_task)
self._callback_task = None
if self._event_task and self._task_manager:
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
if self._audio_task and self._task_manager:
await self._task_manager.cancel_task(self._audio_task)
self._audio_task = None
if self._video_task and self._task_manager:
await self._task_manager.cancel_task(self._video_task)
self._video_task = None
# Make sure we don't block the event loop in case `client.release()`
# takes extra time.
await self._get_event_loop().run_in_executor(self._executor, self._cleanup)
@@ -411,18 +422,26 @@ class DailyTransportClient(EventHandler):
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
if self._params.video_out_enabled and not self._camera:
if self._params.video_out_enabled and not self._camera and self._task_manager:
self._camera = Daily.create_camera_device(
self._camera_name(),
width=self._params.video_out_width,
height=self._params.video_out_height,
color_format=self._params.video_out_color_format,
)
self._audio_task = self._task_manager.create_task(
self._callback_task_handler(self._video_queue),
f"{self}::video_callback_task",
)
if self._params.audio_out_enabled and not self._microphone_track:
if self._params.audio_out_enabled and not self._microphone_track and self._task_manager:
audio_source = CustomAudioSource(self._out_sample_rate, self._params.audio_out_channels)
audio_track = CustomAudioTrack(audio_source)
self._microphone_track = DailyAudioTrack(source=audio_source, track=audio_track)
self._audio_task = self._task_manager.create_task(
self._callback_task_handler(self._audio_queue),
f"{self}::audio_callback_task",
)
async def join(self):
# Transport already joined or joining, ignore.
@@ -772,57 +791,57 @@ class DailyTransportClient(EventHandler):
#
def on_active_speaker_changed(self, participant):
self._call_async_callback(self._callbacks.on_active_speaker_changed, participant)
self._call_event_callback(self._callbacks.on_active_speaker_changed, participant)
def on_app_message(self, message: Any, sender: str):
self._call_async_callback(self._callbacks.on_app_message, message, sender)
self._call_event_callback(self._callbacks.on_app_message, message, sender)
def on_call_state_updated(self, state: str):
self._call_async_callback(self._callbacks.on_call_state_updated, state)
self._call_event_callback(self._callbacks.on_call_state_updated, state)
def on_dialin_connected(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_connected, data)
self._call_event_callback(self._callbacks.on_dialin_connected, data)
def on_dialin_ready(self, sip_endpoint: str):
self._call_async_callback(self._callbacks.on_dialin_ready, sip_endpoint)
self._call_event_callback(self._callbacks.on_dialin_ready, sip_endpoint)
def on_dialin_stopped(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_stopped, data)
self._call_event_callback(self._callbacks.on_dialin_stopped, data)
def on_dialin_error(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_error, data)
self._call_event_callback(self._callbacks.on_dialin_error, data)
def on_dialin_warning(self, data: Any):
self._call_async_callback(self._callbacks.on_dialin_warning, data)
self._call_event_callback(self._callbacks.on_dialin_warning, data)
def on_dialout_answered(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_answered, data)
self._call_event_callback(self._callbacks.on_dialout_answered, data)
def on_dialout_connected(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_connected, data)
self._call_event_callback(self._callbacks.on_dialout_connected, data)
def on_dialout_stopped(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_stopped, data)
self._call_event_callback(self._callbacks.on_dialout_stopped, data)
def on_dialout_error(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_error, data)
self._call_event_callback(self._callbacks.on_dialout_error, data)
def on_dialout_warning(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_warning, data)
self._call_event_callback(self._callbacks.on_dialout_warning, data)
def on_participant_joined(self, participant):
self._call_async_callback(self._callbacks.on_participant_joined, participant)
self._call_event_callback(self._callbacks.on_participant_joined, participant)
def on_participant_left(self, participant, reason):
self._call_async_callback(self._callbacks.on_participant_left, participant, reason)
self._call_event_callback(self._callbacks.on_participant_left, participant, reason)
def on_participant_updated(self, participant):
self._call_async_callback(self._callbacks.on_participant_updated, participant)
self._call_event_callback(self._callbacks.on_participant_updated, participant)
def on_transcription_started(self, status):
logger.debug(f"Transcription started: {status}")
self._transcription_status = status
self._call_async_callback(self.update_transcription, self._transcription_ids)
self._call_event_callback(self.update_transcription, self._transcription_ids)
def on_transcription_stopped(self, stopped_by, stopped_by_error):
logger.debug("Transcription stopped")
@@ -831,55 +850,58 @@ class DailyTransportClient(EventHandler):
logger.error(f"Transcription error: {message}")
def on_transcription_message(self, message):
self._call_async_callback(self._callbacks.on_transcription_message, message)
self._call_event_callback(self._callbacks.on_transcription_message, message)
def on_recording_started(self, status):
logger.debug(f"Recording started: {status}")
self._call_async_callback(self._callbacks.on_recording_started, status)
self._call_event_callback(self._callbacks.on_recording_started, status)
def on_recording_stopped(self, stream_id):
logger.debug(f"Recording stopped: {stream_id}")
self._call_async_callback(self._callbacks.on_recording_stopped, stream_id)
self._call_event_callback(self._callbacks.on_recording_stopped, stream_id)
def on_recording_error(self, stream_id, message):
logger.error(f"Recording error for {stream_id}: {message}")
self._call_async_callback(self._callbacks.on_recording_error, stream_id, message)
self._call_event_callback(self._callbacks.on_recording_error, stream_id, message)
#
# Daily (CallClient callbacks)
#
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
# We don't need to use _call_async_callback() here because we don't care
# about ordering with other events.
callback = self._audio_renderers[participant_id][audio_source]
future = asyncio.run_coroutine_threadsafe(
callback(participant_id, audio_data, audio_source), self._get_event_loop()
)
future.result()
self._call_audio_callback(callback, participant_id, audio_data, audio_source)
def _video_frame_received(
self, participant_id: str, video_frame: VideoFrame, video_source: str
):
# We don't need to use _call_async_callback() here because we don't care
# about ordering with other events.
callback = self._video_renderers[participant_id][video_source]
self._call_video_callback(callback, participant_id, video_frame, video_source)
#
# Queue callbacks handling
#
def _call_audio_callback(self, callback, *args):
self._call_async_callback(self._audio_queue, callback, *args)
def _call_video_callback(self, callback, *args):
self._call_async_callback(self._video_queue, callback, *args)
def _call_event_callback(self, callback, *args):
self._call_async_callback(self._event_queue, callback, *args)
def _call_async_callback(self, queue: asyncio.Queue, callback, *args):
future = asyncio.run_coroutine_threadsafe(
callback(participant_id, video_frame, video_source), self._get_event_loop()
queue.put((callback, *args)), self._get_event_loop()
)
future.result()
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(
self._callback_queue.put((callback, *args)), self._get_event_loop()
)
future.result()
async def _callback_task_handler(self):
async def _callback_task_handler(self, queue: asyncio.Queue):
while True:
# Wait to process any callback until we are joined.
await self._joined_event.wait()
(callback, *args) = await self._callback_queue.get()
(callback, *args) = await queue.get()
await callback(*args)
def _get_event_loop(self) -> asyncio.AbstractEventLoop: