Merge pull request #771 from pipecat-ai/aleix/daily-execute-callbacks-from-task

transports(daily): use a task to execute callbacks
This commit is contained in:
Aleix Conchillo Flaqué
2024-12-03 19:55:38 -08:00
committed by GitHub
2 changed files with 28 additions and 9 deletions

View File

@@ -43,6 +43,9 @@ async def on_audio_data(processor, audio, sample_rate, num_channels):
### Fixed
- Fixed an issue in `DailyTransport` that would cause some internal callbacks to
not be executed.
- Fixed an issue where other frames were being processed while a `CancelFrame`
was being pushed down the pipeline.

View File

@@ -207,6 +207,17 @@ class DailyTransportClient(EventHandler):
self._client: CallClient = CallClient(event_handler=self)
# We use a separate task to execute the callbacks, otherwise if we call
# a `CallClient` function and wait for its completion this will
# currently result in a deadlock. This is because `_call_async_callback`
# can be used inside `CallClient` event handlers which are holding the
# GIL in `daily-python`. So if the `callback` passed here makes a
# `CallClient` call and waits for it to finish using completions (and a
# future) we will deadlock because completions use event handlers (which
# are holding the GIL).
self._callback_queue = asyncio.Queue()
self._callback_task = self._loop.create_task(self._callback_task_handler())
self._camera: VirtualCameraDevice | None = None
if self._params.camera_out_enabled:
self._camera = Daily.create_camera_device(
@@ -455,6 +466,8 @@ class DailyTransportClient(EventHandler):
async def cleanup(self):
await self._loop.run_in_executor(self._executor, self._cleanup)
self._callback_task.cancel()
await self._callback_task
def _cleanup(self):
if self._client:
@@ -636,15 +649,18 @@ class DailyTransportClient(EventHandler):
)
def _call_async_callback(self, callback, *args):
# Don't wait on the coroutine, otherwise if we call a `CallClient`
# function and wait for its completion this will currently result in a
# deadlock. This is because `_call_async_callback` is used inside
# `CallClient` event handlers which are holding the GIL in
# `daily-python`. So if the `callback` passed here makes a `CallClient`
# call and waits for it to finish using completions (and a future) we
# will deadlock because completions use event handlers (which are
# holding the GIL).
asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
future = asyncio.run_coroutine_threadsafe(
self._callback_queue.put((callback, *args)), self._loop
)
future.result()
async def _callback_task_handler(self):
while True:
try:
(callback, *args) = await self._callback_queue.get()
await callback(*args)
except asyncio.CancelledError:
break
class DailyInputTransport(BaseInputTransport):