diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index d8fff4020..86f6cd8b5 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -8,11 +8,12 @@ import asyncio import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from typing import Any, Awaitable, Callable, Mapping, Optional +from typing import Any, Awaitable, Callable, Dict, Mapping, Optional import aiohttp from daily import ( AudioData, + CustomAudioSource, VideoFrame, VirtualCameraDevice, VirtualMicrophoneDevice, @@ -314,6 +315,7 @@ class DailyTransportClient(EventHandler): self._camera: Optional[VirtualCameraDevice] = None self._mic: Optional[VirtualMicrophoneDevice] = None self._speaker: Optional[VirtualSpeakerDevice] = None + self._audio_sources: Dict[str, CustomAudioSource] = {} def _camera_name(self): return f"camera-{self}" @@ -369,21 +371,26 @@ class DailyTransportClient(EventHandler): await asyncio.sleep(0.01) return None - async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): - if not self._mic: - return None + async def register_audio_destination(self, destination: str): + self._audio_sources[destination] = await self.add_custom_audio_track(destination) + self._client.update_publishing({"customAudio": {destination: True}}) + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): future = self._get_event_loop().create_future() - self._mic.write_frames(frames, completion=completion_callback(future)) + if not destination and self._mic: + self._mic.write_frames(frames, completion=completion_callback(future)) + elif destination: + source = self._audio_sources[destination] + source.write_frames(frames, completion=completion_callback(future)) + else: + future.set_result(None) await future async def write_raw_video_frame( self, frame: OutputImageRawFrame, destination: Optional[str] = None ): - if not self._camera: - return None - - self._camera.write_frame(frame.image) + if not destination and self._camera: + self._camera.write_frame(frame.image) async def setup(self, frame: StartFrame): self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate @@ -690,6 +697,20 @@ class DailyTransportClient(EventHandler): color_format=color_format, ) + async def add_custom_audio_track(self, track_name: str) -> CustomAudioSource: + future = self._get_event_loop().create_future() + + audio_source = CustomAudioSource(self._out_sample_rate, 1) + self._client.add_custom_audio_track( + track_name=track_name, + audio_source=audio_source, + completion=completion_callback(future), + ) + + await future + + return audio_source + async def update_transcription(self, participants=None, instance_id=None): future = self._get_event_loop().create_future() self._client.update_transcription( @@ -706,6 +727,14 @@ class DailyTransportClient(EventHandler): ) await future + async def update_publishing(self, publishing_settings: Mapping[str, Any]): + future = self._get_event_loop().create_future() + self._client.update_publishing( + publishing_settings=publishing_settings, + completion=completion_callback(future), + ) + await future + async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None): future = self._get_event_loop().create_future() self._client.update_remote_participants( @@ -871,6 +900,9 @@ class DailyInputTransport(BaseInputTransport): self._audio_in_task = self.create_task(self._audio_in_task_handler()) async def start(self, frame: StartFrame): + # Setup client. + await self._client.setup(frame) + # Parent start. await super().start(frame) @@ -879,8 +911,6 @@ class DailyInputTransport(BaseInputTransport): self._initialized = True - # Setup client. - await self._client.setup(frame) # Join the room. await self._client.join() if self._params.audio_in_stream_on_start: @@ -1045,6 +1075,9 @@ class DailyOutputTransport(BaseOutputTransport): self._initialized = False async def start(self, frame: StartFrame): + # Setup client. + await self._client.setup(frame) + # Parent start. await super().start(frame) @@ -1053,8 +1086,6 @@ class DailyOutputTransport(BaseOutputTransport): self._initialized = True - # Setup client. - await self._client.setup(frame) # Join the room. await self._client.join() @@ -1078,13 +1109,19 @@ class DailyOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._client.send_message(frame) + async def register_video_destination(self, destination: str): + logger.warning(f"{self} registering video destinations is not supported yet") + + async def register_audio_destination(self, destination: str): + await self._client.register_audio_destination(destination) + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): - await self._client.write_raw_audio_frames(frames) + await self._client.write_raw_audio_frames(frames, destination) async def write_raw_video_frame( self, frame: OutputImageRawFrame, destination: Optional[str] = None ): - await self._client.write_raw_video_frame(frame) + await self._client.write_raw_video_frame(frame, destination) class DailyTransport(BaseTransport): @@ -1272,6 +1309,9 @@ class DailyTransport(BaseTransport): participant_id, framerate, video_source, color_format ) + async def update_publishing(self, publishing_settings: Mapping[str, Any]): + await self._client.update_publishing(publishing_settings=publishing_settings) + async def update_subscriptions(self, participant_settings=None, profile_settings=None): await self._client.update_subscriptions( participant_settings=participant_settings, profile_settings=profile_settings