From c3cfd1f0ceb967409972df0814c49056ea3ed5dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 23 May 2025 00:38:40 -0700 Subject: [PATCH] DailyTransport: process audio, video and event callbacks in separate tasks --- CHANGELOG.md | 2 + src/pipecat/transports/services/daily.py | 134 +++++++++++++---------- 2 files changed, 80 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f79e8bc8..70ca7bd0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -136,6 +136,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Performance +- `DailyTransport`: process audio, video and events in separate tasks. + - Don't create event handler tasks if no user event handlers have been registered. diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index f48f4cd61..d58737131 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -309,16 +309,21 @@ class DailyTransportClient(EventHandler): self._client: CallClient = CallClient(event_handler=self) - # We use a separate task to execute the callbacks, otherwise if we call - # a `CallClient` function and wait for its completion this will - # currently result in a deadlock. This is because `_call_async_callback` - # can be used inside `CallClient` event handlers which are holding the - # GIL in `daily-python`. So if the `callback` passed here makes a - # `CallClient` call and waits for it to finish using completions (and a - # future) we will deadlock because completions use event handlers (which - # are holding the GIL). - self._callback_queue = asyncio.Queue() - self._callback_task = None + # We use separate tasks to execute callbacks (events, audio or + # video). In the case of events, if we call a `CallClient` function + # inside the callback and wait for its completion this will result in a + # deadlock (because we haven't exited the event callback). The deadlocks + # occur because `daily-python` is holding the GIL when calling the + # callbacks. So, if our callback handler makes a `CallClient` call and + # waits for it to finish using completions (and a future) we will + # deadlock because completions use event handlers (which are holding the + # GIL). + self._event_queue = asyncio.Queue() + self._audio_queue = asyncio.Queue() + self._video_queue = asyncio.Queue() + self._event_task = None + self._audio_task = None + self._video_task = None # Input and ouput sample rates. They will be initialize on setup(). self._in_sample_rate = 0 @@ -394,15 +399,21 @@ class DailyTransportClient(EventHandler): return self._task_manager = setup.task_manager - self._callback_task = self._task_manager.create_task( - self._callback_task_handler(), - f"{self}::callback_task", + self._event_task = self._task_manager.create_task( + self._callback_task_handler(self._event_queue), + f"{self}::event_callback_task", ) async def cleanup(self): - if self._callback_task and self._task_manager: - await self._task_manager.cancel_task(self._callback_task) - self._callback_task = None + if self._event_task and self._task_manager: + await self._task_manager.cancel_task(self._event_task) + self._event_task = None + if self._audio_task and self._task_manager: + await self._task_manager.cancel_task(self._audio_task) + self._audio_task = None + if self._video_task and self._task_manager: + await self._task_manager.cancel_task(self._video_task) + self._video_task = None # Make sure we don't block the event loop in case `client.release()` # takes extra time. await self._get_event_loop().run_in_executor(self._executor, self._cleanup) @@ -411,18 +422,26 @@ class DailyTransportClient(EventHandler): self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate - if self._params.video_out_enabled and not self._camera: + if self._params.video_out_enabled and not self._camera and self._task_manager: self._camera = Daily.create_camera_device( self._camera_name(), width=self._params.video_out_width, height=self._params.video_out_height, color_format=self._params.video_out_color_format, ) + self._audio_task = self._task_manager.create_task( + self._callback_task_handler(self._video_queue), + f"{self}::video_callback_task", + ) - if self._params.audio_out_enabled and not self._microphone_track: + if self._params.audio_out_enabled and not self._microphone_track and self._task_manager: audio_source = CustomAudioSource(self._out_sample_rate, self._params.audio_out_channels) audio_track = CustomAudioTrack(audio_source) self._microphone_track = DailyAudioTrack(source=audio_source, track=audio_track) + self._audio_task = self._task_manager.create_task( + self._callback_task_handler(self._audio_queue), + f"{self}::audio_callback_task", + ) async def join(self): # Transport already joined or joining, ignore. @@ -772,57 +791,57 @@ class DailyTransportClient(EventHandler): # def on_active_speaker_changed(self, participant): - self._call_async_callback(self._callbacks.on_active_speaker_changed, participant) + self._call_event_callback(self._callbacks.on_active_speaker_changed, participant) def on_app_message(self, message: Any, sender: str): - self._call_async_callback(self._callbacks.on_app_message, message, sender) + self._call_event_callback(self._callbacks.on_app_message, message, sender) def on_call_state_updated(self, state: str): - self._call_async_callback(self._callbacks.on_call_state_updated, state) + self._call_event_callback(self._callbacks.on_call_state_updated, state) def on_dialin_connected(self, data: Any): - self._call_async_callback(self._callbacks.on_dialin_connected, data) + self._call_event_callback(self._callbacks.on_dialin_connected, data) def on_dialin_ready(self, sip_endpoint: str): - self._call_async_callback(self._callbacks.on_dialin_ready, sip_endpoint) + self._call_event_callback(self._callbacks.on_dialin_ready, sip_endpoint) def on_dialin_stopped(self, data: Any): - self._call_async_callback(self._callbacks.on_dialin_stopped, data) + self._call_event_callback(self._callbacks.on_dialin_stopped, data) def on_dialin_error(self, data: Any): - self._call_async_callback(self._callbacks.on_dialin_error, data) + self._call_event_callback(self._callbacks.on_dialin_error, data) def on_dialin_warning(self, data: Any): - self._call_async_callback(self._callbacks.on_dialin_warning, data) + self._call_event_callback(self._callbacks.on_dialin_warning, data) def on_dialout_answered(self, data: Any): - self._call_async_callback(self._callbacks.on_dialout_answered, data) + self._call_event_callback(self._callbacks.on_dialout_answered, data) def on_dialout_connected(self, data: Any): - self._call_async_callback(self._callbacks.on_dialout_connected, data) + self._call_event_callback(self._callbacks.on_dialout_connected, data) def on_dialout_stopped(self, data: Any): - self._call_async_callback(self._callbacks.on_dialout_stopped, data) + self._call_event_callback(self._callbacks.on_dialout_stopped, data) def on_dialout_error(self, data: Any): - self._call_async_callback(self._callbacks.on_dialout_error, data) + self._call_event_callback(self._callbacks.on_dialout_error, data) def on_dialout_warning(self, data: Any): - self._call_async_callback(self._callbacks.on_dialout_warning, data) + self._call_event_callback(self._callbacks.on_dialout_warning, data) def on_participant_joined(self, participant): - self._call_async_callback(self._callbacks.on_participant_joined, participant) + self._call_event_callback(self._callbacks.on_participant_joined, participant) def on_participant_left(self, participant, reason): - self._call_async_callback(self._callbacks.on_participant_left, participant, reason) + self._call_event_callback(self._callbacks.on_participant_left, participant, reason) def on_participant_updated(self, participant): - self._call_async_callback(self._callbacks.on_participant_updated, participant) + self._call_event_callback(self._callbacks.on_participant_updated, participant) def on_transcription_started(self, status): logger.debug(f"Transcription started: {status}") self._transcription_status = status - self._call_async_callback(self.update_transcription, self._transcription_ids) + self._call_event_callback(self.update_transcription, self._transcription_ids) def on_transcription_stopped(self, stopped_by, stopped_by_error): logger.debug("Transcription stopped") @@ -831,55 +850,58 @@ class DailyTransportClient(EventHandler): logger.error(f"Transcription error: {message}") def on_transcription_message(self, message): - self._call_async_callback(self._callbacks.on_transcription_message, message) + self._call_event_callback(self._callbacks.on_transcription_message, message) def on_recording_started(self, status): logger.debug(f"Recording started: {status}") - self._call_async_callback(self._callbacks.on_recording_started, status) + self._call_event_callback(self._callbacks.on_recording_started, status) def on_recording_stopped(self, stream_id): logger.debug(f"Recording stopped: {stream_id}") - self._call_async_callback(self._callbacks.on_recording_stopped, stream_id) + self._call_event_callback(self._callbacks.on_recording_stopped, stream_id) def on_recording_error(self, stream_id, message): logger.error(f"Recording error for {stream_id}: {message}") - self._call_async_callback(self._callbacks.on_recording_error, stream_id, message) + self._call_event_callback(self._callbacks.on_recording_error, stream_id, message) # # Daily (CallClient callbacks) # def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str): - # We don't need to use _call_async_callback() here because we don't care - # about ordering with other events. callback = self._audio_renderers[participant_id][audio_source] - future = asyncio.run_coroutine_threadsafe( - callback(participant_id, audio_data, audio_source), self._get_event_loop() - ) - future.result() + self._call_audio_callback(callback, participant_id, audio_data, audio_source) def _video_frame_received( self, participant_id: str, video_frame: VideoFrame, video_source: str ): - # We don't need to use _call_async_callback() here because we don't care - # about ordering with other events. callback = self._video_renderers[participant_id][video_source] + self._call_video_callback(callback, participant_id, video_frame, video_source) + + # + # Queue callbacks handling + # + + def _call_audio_callback(self, callback, *args): + self._call_async_callback(self._audio_queue, callback, *args) + + def _call_video_callback(self, callback, *args): + self._call_async_callback(self._video_queue, callback, *args) + + def _call_event_callback(self, callback, *args): + self._call_async_callback(self._event_queue, callback, *args) + + def _call_async_callback(self, queue: asyncio.Queue, callback, *args): future = asyncio.run_coroutine_threadsafe( - callback(participant_id, video_frame, video_source), self._get_event_loop() + queue.put((callback, *args)), self._get_event_loop() ) future.result() - def _call_async_callback(self, callback, *args): - future = asyncio.run_coroutine_threadsafe( - self._callback_queue.put((callback, *args)), self._get_event_loop() - ) - future.result() - - async def _callback_task_handler(self): + async def _callback_task_handler(self, queue: asyncio.Queue): while True: # Wait to process any callback until we are joined. await self._joined_event.wait() - (callback, *args) = await self._callback_queue.get() + (callback, *args) = await queue.get() await callback(*args) def _get_event_loop(self) -> asyncio.AbstractEventLoop: