From fcf49e79ccbb0c76937e53abf8e7a0ce39ab6ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 16 May 2025 19:03:53 -0700 Subject: [PATCH] DailyTransport: use participant audio renderers instead of virtual speaker --- CHANGELOG.md | 3 + src/pipecat/processors/frameworks/rtvi.py | 2 +- src/pipecat/transports/base_input.py | 2 +- src/pipecat/transports/services/daily.py | 131 +++++++++------------- 4 files changed, 56 insertions(+), 82 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 564650905..91605bc65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `DailyTransport` now captures audio from individual participants instead of + the whole room. This allows identifying audio frames per participant. + - Updated the default model for `AnthropicLLMService` to `claude-sonnet-4-20250514`. diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 449bf3e33..7bd54237d 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -843,7 +843,7 @@ class RTVIProcessor(FrameProcessor): async def _handle_client_ready(self, request_id: str): logger.debug("Received client-ready") if self._input_transport: - self._input_transport.start_audio_in_streaming() + await self._input_transport.start_audio_in_streaming() self._client_ready_id = request_id await self.set_client_ready() diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index f9a27a6d3..0dea8efdc 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -101,7 +101,7 @@ class BaseInputTransport(FrameProcessor): logger.debug(f"Enabling audio on start. {enabled}") self._params.audio_in_stream_on_start = enabled - def start_audio_in_streaming(self): + async def start_audio_in_streaming(self): pass @property diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 095084037..53619a0c0 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -14,14 +14,12 @@ import aiohttp from loguru import logger from pydantic import BaseModel -from pipecat.audio.utils import create_default_resampler from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams from pipecat.frames.frames import ( CancelFrame, EndFrame, ErrorFrame, Frame, - InputAudioRawFrame, InterimTranscriptionFrame, OutputAudioRawFrame, OutputImageRawFrame, @@ -51,7 +49,6 @@ try: VideoFrame, VirtualCameraDevice, VirtualMicrophoneDevice, - VirtualSpeakerDevice, ) except ModuleNotFoundError as e: logger.error(f"Exception: {e}") @@ -323,7 +320,6 @@ class DailyTransportClient(EventHandler): self._camera: Optional[VirtualCameraDevice] = None self._mic: Optional[VirtualMicrophoneDevice] = None - self._speaker: Optional[VirtualSpeakerDevice] = None self._audio_sources: Dict[str, CustomAudioSource] = {} def _camera_name(self): @@ -332,9 +328,6 @@ class DailyTransportClient(EventHandler): def _mic_name(self): return f"mic-{self}" - def _speaker_name(self): - return f"speaker-{self}" - @property def room_url(self) -> str: return self._room_url @@ -365,29 +358,6 @@ class DailyTransportClient(EventHandler): ) await future - async def read_next_audio_frame(self) -> Optional[InputAudioRawFrame]: - if not self._speaker: - return None - - sample_rate = self._in_sample_rate - num_channels = self._params.audio_in_channels - num_frames = int(sample_rate / 100) * 2 # 20ms of audio - - future = self._get_event_loop().create_future() - self._speaker.read_frames(num_frames, completion=completion_callback(future)) - audio = await future - - if len(audio) > 0: - return InputAudioRawFrame( - audio=audio, sample_rate=sample_rate, num_channels=num_channels - ) - else: - # If we don't read any audio it could be there's no participant - # connected. daily-python will return immediately if that's the - # case, so let's sleep for a little bit (i.e. busy wait). - await asyncio.sleep(0.01) - return None - async def register_audio_destination(self, destination: str): self._audio_sources[destination] = await self.add_custom_audio_track(destination) self._client.update_publishing({"customAudio": {destination: True}}) @@ -448,15 +418,6 @@ class DailyTransportClient(EventHandler): non_blocking=True, ) - if self._params.audio_in_enabled and not self._speaker: - self._speaker = Daily.create_speaker_device( - self._speaker_name(), - sample_rate=self._in_sample_rate, - channels=self._params.audio_in_channels, - non_blocking=True, - ) - Daily.select_speaker_device(self._speaker_name()) - async def join(self): # Transport already joined or joining, ignore. if self._joined or self._joining: @@ -694,6 +655,8 @@ class DailyTransportClient(EventHandler): participant_id: str, callback: Callable, audio_source: str = "microphone", + sample_rate: int = 16000, + callback_interval_ms: int = 20, ): # Only enable the desired audio source subscription on this participant. if audio_source in ("microphone", "screenAudio"): @@ -705,14 +668,14 @@ class DailyTransportClient(EventHandler): self._audio_renderers.setdefault(participant_id, {})[audio_source] = callback - logger.info( - f"Starting to capture audio from participant {participant_id} to {audio_source}" - ) + logger.info(f"Starting to capture [{audio_source}] audio from participant {participant_id}") self._client.set_audio_renderer( participant_id, self._audio_data_received, audio_source=audio_source, + sample_rate=sample_rate, + callback_interval_ms=callback_interval_ms, ) async def capture_participant_video( @@ -877,14 +840,24 @@ class DailyTransportClient(EventHandler): # def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str): + # We don't need to use _call_async_callback() here because we don't care + # about ordering with other events. callback = self._audio_renderers[participant_id][audio_source] - self._call_async_callback(callback, participant_id, audio_data, audio_source) + future = asyncio.run_coroutine_threadsafe( + callback(participant_id, audio_data, audio_source), self._get_event_loop() + ) + future.result() def _video_frame_received( self, participant_id: str, video_frame: VideoFrame, video_source: str ): + # We don't need to use _call_async_callback() here because we don't care + # about ordering with other events. callback = self._video_renderers[participant_id][video_source] - self._call_async_callback(callback, participant_id, video_frame, video_source) + future = asyncio.run_coroutine_threadsafe( + callback(participant_id, video_frame, video_source), self._get_event_loop() + ) + future.result() def _call_async_callback(self, callback, *args): future = asyncio.run_coroutine_threadsafe( @@ -936,11 +909,12 @@ class DailyInputTransport(BaseInputTransport): # Whether we have seen a StartFrame already. self._initialized = False - # Task that gets audio data from a device or the network and queues it - # internally to be processed. - self._audio_in_task = None + # Whether we have started audio streaming. + self._streaming_started = False - self._resampler = create_default_resampler() + # Store the list of participants we should stream. This is necessary in + # case we don't start streaming right away. + self._capture_participant_audio = [] self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer @@ -948,12 +922,17 @@ class DailyInputTransport(BaseInputTransport): def vad_analyzer(self) -> Optional[VADAnalyzer]: return self._vad_analyzer - def start_audio_in_streaming(self): - # Create audio task. It reads audio frames from Daily and push them - # internally for VAD processing. - if not self._audio_in_task and self._params.audio_in_enabled: - logger.debug(f"Start receiving audio") - self._audio_in_task = self.create_task(self._audio_in_task_handler()) + async def start_audio_in_streaming(self): + if not self._params.audio_in_enabled: + return + + logger.debug(f"Start receiving audio") + for participant_id, audio_source, sample_rate in self._capture_participant_audio: + await self._client.capture_participant_audio( + participant_id, self._on_participant_audio_data, audio_source, sample_rate + ) + + self._streaming_started = True async def setup(self, setup: FrameProcessorSetup): await super().setup(setup) @@ -983,27 +962,19 @@ class DailyInputTransport(BaseInputTransport): await self.set_transport_ready(frame) if self._params.audio_in_stream_on_start: - self.start_audio_in_streaming() + await self.start_audio_in_streaming() async def stop(self, frame: EndFrame): # Parent stop. await super().stop(frame) # Leave the room. await self._client.leave() - # Stop audio thread. - if self._audio_in_task and self._params.audio_in_enabled: - await self.cancel_task(self._audio_in_task) - self._audio_in_task = None async def cancel(self, frame: CancelFrame): # Parent stop. await super().cancel(frame) # Leave the room. await self._client.leave() - # Stop audio thread. - if self._audio_in_task and self._params.audio_in_enabled: - await self.cancel_task(self._audio_in_task) - self._audio_in_task = None # # FrameProcessor @@ -1034,32 +1005,26 @@ class DailyInputTransport(BaseInputTransport): self, participant_id: str, audio_source: str = "microphone", + sample_rate: int = 16000, ): - await self._client.capture_participant_audio( - participant_id, self._on_participant_audio_data, audio_source - ) + if self._streaming_started: + await self._client.capture_participant_audio( + participant_id, self._on_participant_audio_data, audio_source, sample_rate + ) + else: + self._capture_participant_audio.append((participant_id, audio_source, sample_rate)) async def _on_participant_audio_data( self, participant_id: str, audio: AudioData, audio_source: str ): - resampled = await self._resampler.resample( - audio.audio_frames, audio.sample_rate, self._client.out_sample_rate - ) - frame = UserAudioRawFrame( user_id=participant_id, - audio=resampled, - sample_rate=self._client.out_sample_rate, + audio=audio.audio_frames, + sample_rate=audio.sample_rate, num_channels=audio.num_channels, ) frame.transport_source = audio_source - await self.push_frame(frame) - - async def _audio_in_task_handler(self): - while True: - frame = await self._client.read_next_audio_frame() - if frame: - await self.push_audio_frame(frame) + await self.push_audio_frame(frame) # # Camera in @@ -1376,9 +1341,10 @@ class DailyTransport(BaseTransport): self, participant_id: str, audio_source: str = "microphone", + sample_rate: int = 16000, ): if self._input: - await self._input.capture_participant_audio(participant_id, audio_source) + await self._input.capture_participant_audio(participant_id, audio_source, sample_rate) async def capture_participant_video( self, @@ -1509,6 +1475,11 @@ class DailyTransport(BaseTransport): id = participant["id"] logger.info(f"Participant joined {id}") + if self._input and self._params.audio_in_enabled: + await self._input.capture_participant_audio( + id, "microphone", self._client.in_sample_rate + ) + if not self._other_participant_has_joined: self._other_participant_has_joined = True await self._call_event_handler("on_first_participant_joined", participant)