diff --git a/CHANGELOG.md b/CHANGELOG.md index 19dd9aab5..c449d450c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ async def on_audio_data(processor, audio, sample_rate, num_channels): ### Fixed +- Fixed an issue in `DailyTransport` that would cause some internal callbacks to + not be executed. + - Fixed an issue where other frames were being processed while a `CancelFrame` was being pushed down the pipeline. diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index a5936e80a..07c023233 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -207,6 +207,17 @@ class DailyTransportClient(EventHandler): self._client: CallClient = CallClient(event_handler=self) + # We use a separate task to execute the callbacks, otherwise if we call + # a `CallClient` function and wait for its completion this will + # currently result in a deadlock. This is because `_call_async_callback` + # can be used inside `CallClient` event handlers which are holding the + # GIL in `daily-python`. So if the `callback` passed here makes a + # `CallClient` call and waits for it to finish using completions (and a + # future) we will deadlock because completions use event handlers (which + # are holding the GIL). + self._callback_queue = asyncio.Queue() + self._callback_task = self._loop.create_task(self._callback_task_handler()) + self._camera: VirtualCameraDevice | None = None if self._params.camera_out_enabled: self._camera = Daily.create_camera_device( @@ -455,6 +466,8 @@ class DailyTransportClient(EventHandler): async def cleanup(self): await self._loop.run_in_executor(self._executor, self._cleanup) + self._callback_task.cancel() + await self._callback_task def _cleanup(self): if self._client: @@ -636,15 +649,18 @@ class DailyTransportClient(EventHandler): ) def _call_async_callback(self, callback, *args): - # Don't wait on the coroutine, otherwise if we call a `CallClient` - # function and wait for its completion this will currently result in a - # deadlock. This is because `_call_async_callback` is used inside - # `CallClient` event handlers which are holding the GIL in - # `daily-python`. So if the `callback` passed here makes a `CallClient` - # call and waits for it to finish using completions (and a future) we - # will deadlock because completions use event handlers (which are - # holding the GIL). - asyncio.run_coroutine_threadsafe(callback(*args), self._loop) + future = asyncio.run_coroutine_threadsafe( + self._callback_queue.put((callback, *args)), self._loop + ) + future.result() + + async def _callback_task_handler(self): + while True: + try: + (callback, *args) = await self._callback_queue.get() + await callback(*args) + except asyncio.CancelledError: + break class DailyInputTransport(BaseInputTransport):