DailyTransport: use participant audio renderers instead of virtual speaker

This commit is contained in:
Aleix Conchillo Flaqué
2025-05-16 19:03:53 -07:00
parent 8d4894846d
commit fcf49e79cc
4 changed files with 56 additions and 82 deletions

View File

@@ -76,6 +76,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `DailyTransport` now captures audio from individual participants instead of
the whole room. This allows identifying audio frames per participant.
- Updated the default model for `AnthropicLLMService` to
`claude-sonnet-4-20250514`.

View File

@@ -843,7 +843,7 @@ class RTVIProcessor(FrameProcessor):
async def _handle_client_ready(self, request_id: str):
logger.debug("Received client-ready")
if self._input_transport:
self._input_transport.start_audio_in_streaming()
await self._input_transport.start_audio_in_streaming()
self._client_ready_id = request_id
await self.set_client_ready()

View File

@@ -101,7 +101,7 @@ class BaseInputTransport(FrameProcessor):
logger.debug(f"Enabling audio on start. {enabled}")
self._params.audio_in_stream_on_start = enabled
def start_audio_in_streaming(self):
async def start_audio_in_streaming(self):
pass
@property

View File

@@ -14,14 +14,12 @@ import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InputAudioRawFrame,
InterimTranscriptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
@@ -51,7 +49,6 @@ try:
VideoFrame,
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -323,7 +320,6 @@ class DailyTransportClient(EventHandler):
self._camera: Optional[VirtualCameraDevice] = None
self._mic: Optional[VirtualMicrophoneDevice] = None
self._speaker: Optional[VirtualSpeakerDevice] = None
self._audio_sources: Dict[str, CustomAudioSource] = {}
def _camera_name(self):
@@ -332,9 +328,6 @@ class DailyTransportClient(EventHandler):
def _mic_name(self):
return f"mic-{self}"
def _speaker_name(self):
return f"speaker-{self}"
@property
def room_url(self) -> str:
return self._room_url
@@ -365,29 +358,6 @@ class DailyTransportClient(EventHandler):
)
await future
async def read_next_audio_frame(self) -> Optional[InputAudioRawFrame]:
if not self._speaker:
return None
sample_rate = self._in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
future = self._get_event_loop().create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
if len(audio) > 0:
return InputAudioRawFrame(
audio=audio, sample_rate=sample_rate, num_channels=num_channels
)
else:
# If we don't read any audio it could be there's no participant
# connected. daily-python will return immediately if that's the
# case, so let's sleep for a little bit (i.e. busy wait).
await asyncio.sleep(0.01)
return None
async def register_audio_destination(self, destination: str):
self._audio_sources[destination] = await self.add_custom_audio_track(destination)
self._client.update_publishing({"customAudio": {destination: True}})
@@ -448,15 +418,6 @@ class DailyTransportClient(EventHandler):
non_blocking=True,
)
if self._params.audio_in_enabled and not self._speaker:
self._speaker = Daily.create_speaker_device(
self._speaker_name(),
sample_rate=self._in_sample_rate,
channels=self._params.audio_in_channels,
non_blocking=True,
)
Daily.select_speaker_device(self._speaker_name())
async def join(self):
# Transport already joined or joining, ignore.
if self._joined or self._joining:
@@ -694,6 +655,8 @@ class DailyTransportClient(EventHandler):
participant_id: str,
callback: Callable,
audio_source: str = "microphone",
sample_rate: int = 16000,
callback_interval_ms: int = 20,
):
# Only enable the desired audio source subscription on this participant.
if audio_source in ("microphone", "screenAudio"):
@@ -705,14 +668,14 @@ class DailyTransportClient(EventHandler):
self._audio_renderers.setdefault(participant_id, {})[audio_source] = callback
logger.info(
f"Starting to capture audio from participant {participant_id} to {audio_source}"
)
logger.info(f"Starting to capture [{audio_source}] audio from participant {participant_id}")
self._client.set_audio_renderer(
participant_id,
self._audio_data_received,
audio_source=audio_source,
sample_rate=sample_rate,
callback_interval_ms=callback_interval_ms,
)
async def capture_participant_video(
@@ -877,14 +840,24 @@ class DailyTransportClient(EventHandler):
#
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
# We don't need to use _call_async_callback() here because we don't care
# about ordering with other events.
callback = self._audio_renderers[participant_id][audio_source]
self._call_async_callback(callback, participant_id, audio_data, audio_source)
future = asyncio.run_coroutine_threadsafe(
callback(participant_id, audio_data, audio_source), self._get_event_loop()
)
future.result()
def _video_frame_received(
self, participant_id: str, video_frame: VideoFrame, video_source: str
):
# We don't need to use _call_async_callback() here because we don't care
# about ordering with other events.
callback = self._video_renderers[participant_id][video_source]
self._call_async_callback(callback, participant_id, video_frame, video_source)
future = asyncio.run_coroutine_threadsafe(
callback(participant_id, video_frame, video_source), self._get_event_loop()
)
future.result()
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(
@@ -936,11 +909,12 @@ class DailyInputTransport(BaseInputTransport):
# Whether we have seen a StartFrame already.
self._initialized = False
# Task that gets audio data from a device or the network and queues it
# internally to be processed.
self._audio_in_task = None
# Whether we have started audio streaming.
self._streaming_started = False
self._resampler = create_default_resampler()
# Store the list of participants we should stream. This is necessary in
# case we don't start streaming right away.
self._capture_participant_audio = []
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
@@ -948,12 +922,17 @@ class DailyInputTransport(BaseInputTransport):
def vad_analyzer(self) -> Optional[VADAnalyzer]:
return self._vad_analyzer
def start_audio_in_streaming(self):
# Create audio task. It reads audio frames from Daily and push them
# internally for VAD processing.
if not self._audio_in_task and self._params.audio_in_enabled:
logger.debug(f"Start receiving audio")
self._audio_in_task = self.create_task(self._audio_in_task_handler())
async def start_audio_in_streaming(self):
if not self._params.audio_in_enabled:
return
logger.debug(f"Start receiving audio")
for participant_id, audio_source, sample_rate in self._capture_participant_audio:
await self._client.capture_participant_audio(
participant_id, self._on_participant_audio_data, audio_source, sample_rate
)
self._streaming_started = True
async def setup(self, setup: FrameProcessorSetup):
await super().setup(setup)
@@ -983,27 +962,19 @@ class DailyInputTransport(BaseInputTransport):
await self.set_transport_ready(frame)
if self._params.audio_in_stream_on_start:
self.start_audio_in_streaming()
await self.start_audio_in_streaming()
async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and self._params.audio_in_enabled:
await self.cancel_task(self._audio_in_task)
self._audio_in_task = None
async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._audio_in_task and self._params.audio_in_enabled:
await self.cancel_task(self._audio_in_task)
self._audio_in_task = None
#
# FrameProcessor
@@ -1034,32 +1005,26 @@ class DailyInputTransport(BaseInputTransport):
self,
participant_id: str,
audio_source: str = "microphone",
sample_rate: int = 16000,
):
await self._client.capture_participant_audio(
participant_id, self._on_participant_audio_data, audio_source
)
if self._streaming_started:
await self._client.capture_participant_audio(
participant_id, self._on_participant_audio_data, audio_source, sample_rate
)
else:
self._capture_participant_audio.append((participant_id, audio_source, sample_rate))
async def _on_participant_audio_data(
self, participant_id: str, audio: AudioData, audio_source: str
):
resampled = await self._resampler.resample(
audio.audio_frames, audio.sample_rate, self._client.out_sample_rate
)
frame = UserAudioRawFrame(
user_id=participant_id,
audio=resampled,
sample_rate=self._client.out_sample_rate,
audio=audio.audio_frames,
sample_rate=audio.sample_rate,
num_channels=audio.num_channels,
)
frame.transport_source = audio_source
await self.push_frame(frame)
async def _audio_in_task_handler(self):
while True:
frame = await self._client.read_next_audio_frame()
if frame:
await self.push_audio_frame(frame)
await self.push_audio_frame(frame)
#
# Camera in
@@ -1376,9 +1341,10 @@ class DailyTransport(BaseTransport):
self,
participant_id: str,
audio_source: str = "microphone",
sample_rate: int = 16000,
):
if self._input:
await self._input.capture_participant_audio(participant_id, audio_source)
await self._input.capture_participant_audio(participant_id, audio_source, sample_rate)
async def capture_participant_video(
self,
@@ -1509,6 +1475,11 @@ class DailyTransport(BaseTransport):
id = participant["id"]
logger.info(f"Participant joined {id}")
if self._input and self._params.audio_in_enabled:
await self._input.capture_participant_audio(
id, "microphone", self._client.in_sample_rate
)
if not self._other_participant_has_joined:
self._other_participant_has_joined = True
await self._call_event_handler("on_first_participant_joined", participant)