only send data to transports after they are really ready

This commit is contained in:
Aleix Conchillo Flaqué
2025-05-05 14:06:58 -07:00
parent e15fa8777a
commit 855d567b1e
11 changed files with 58 additions and 24 deletions

View File

@@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue that could cause data to be sent to the transports when they
were still not ready.
- Remove custom audio tracks from `DailyTransport` before leaving.
## [0.0.66] - 2025-05-02

View File

@@ -122,6 +122,7 @@ class BaseInputTransport(FrameProcessor):
# Configure VAD analyzer.
if self._params.vad_analyzer:
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
# Configure End of turn analyzer.
if self._params.turn_analyzer:
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
@@ -129,10 +130,6 @@ class BaseInputTransport(FrameProcessor):
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._sample_rate)
# Create audio input queue and task if needed.
if not self._audio_task and self._params.audio_in_enabled:
self._audio_in_queue = asyncio.Queue()
self._audio_task = self.create_task(self._audio_task_handler())
async def stop(self, frame: EndFrame):
# Cancel and wait for the audio input task to finish.
@@ -149,6 +146,13 @@ class BaseInputTransport(FrameProcessor):
await self.cancel_task(self._audio_task)
self._audio_task = None
async def set_transport_ready(self, frame: StartFrame):
"""To be called when the transport is ready to stream."""
# Create audio input queue and task if needed.
if not self._audio_task and self._params.audio_in_enabled:
self._audio_in_queue = asyncio.Queue()
self._audio_task = self.create_task(self._audio_task_handler())
async def push_audio_frame(self, frame: InputAudioRawFrame):
if self._params.audio_in_enabled:
await self._audio_in_queue.put(frame)

View File

@@ -78,6 +78,16 @@ class BaseOutputTransport(FrameProcessor):
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
async def stop(self, frame: EndFrame):
for _, sender in self._media_senders.items():
await sender.stop(frame)
async def cancel(self, frame: CancelFrame):
for _, sender in self._media_senders.items():
await sender.cancel(frame)
async def set_transport_ready(self, frame: StartFrame):
"""To be called when the transport is ready to stream."""
# Register destinations.
for destination in self._params.audio_out_destinations:
await self.register_audio_destination(destination)
@@ -112,14 +122,6 @@ class BaseOutputTransport(FrameProcessor):
)
await self._media_senders[destination].start(frame)
async def stop(self, frame: EndFrame):
for _, sender in self._media_senders.items():
await sender.stop(frame)
async def cancel(self, frame: CancelFrame):
for _, sender in self._media_senders.items():
await sender.cancel(frame)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass

View File

@@ -61,6 +61,8 @@ class LocalAudioInputTransport(BaseInputTransport):
)
self._in_stream.start_stream()
await self.set_transport_ready(frame)
async def cleanup(self):
await super().cleanup()
if self._in_stream:
@@ -111,6 +113,8 @@ class LocalAudioOutputTransport(BaseOutputTransport):
)
self._out_stream.start_stream()
await self.set_transport_ready(frame)
async def cleanup(self):
await super().cleanup()
if self._out_stream:

View File

@@ -68,6 +68,8 @@ class TkInputTransport(BaseInputTransport):
)
self._in_stream.start_stream()
await self.set_transport_ready(frame)
async def cleanup(self):
await super().cleanup()
if self._in_stream:
@@ -124,6 +126,8 @@ class TkOutputTransport(BaseOutputTransport):
)
self._out_stream.start_stream()
await self.set_transport_ready(frame)
async def cleanup(self):
await super().cleanup()
if self._out_stream:

View File

@@ -131,6 +131,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
await self._client.trigger_client_connected()
if not self._receive_task:
self._receive_task = self.create_task(self._receive_messages())
await self.set_transport_ready(frame)
async def _stop_tasks(self):
if self._monitor_websocket_task:
@@ -204,6 +205,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await self._client.setup(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)

View File

@@ -395,6 +395,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
self._receive_audio_task = self.create_task(self._receive_audio())
if not self._receive_video_task and self._params.video_in_enabled:
self._receive_video_task = self.create_task(self._receive_video())
await self.set_transport_ready(frame)
async def _stop_tasks(self):
if self._receive_audio_task:
@@ -487,6 +488,7 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._client.setup(self._params, frame)
await self._client.connect()
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)

View File

@@ -136,6 +136,7 @@ class WebsocketClientInputTransport(BaseInputTransport):
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -186,6 +187,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)

View File

@@ -83,6 +83,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
await self._params.serializer.setup(frame)
if not self._server_task:
self._server_task = self.create_task(self._server_task_handler())
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -195,6 +196,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)

View File

@@ -944,19 +944,23 @@ class DailyInputTransport(BaseInputTransport):
self._audio_in_task = self.create_task(self._audio_in_task_handler())
async def start(self, frame: StartFrame):
# Setup client.
await self._client.setup(frame)
# Parent start.
await super().start(frame)
if self._initialized:
return
self._initialized = True
# Parent start.
await super().start(frame)
# Setup client.
await self._client.setup(frame)
# Join the room.
await self._client.join()
# Indicate the transport that we are connected.
await self.set_transport_ready(frame)
if self._params.audio_in_stream_on_start:
self.start_audio_in_streaming()
@@ -1125,20 +1129,23 @@ class DailyOutputTransport(BaseOutputTransport):
self._initialized = False
async def start(self, frame: StartFrame):
# Setup client.
await self._client.setup(frame)
# Parent start.
await super().start(frame)
if self._initialized:
return
self._initialized = True
# Parent start.
await super().start(frame)
# Setup client.
await self._client.setup(frame)
# Join the room.
await self._client.join()
# Indicate the transport that we are connected.
await self.set_transport_ready(frame)
async def stop(self, frame: EndFrame):
# Parent stop.
await super().stop(frame)

View File

@@ -370,6 +370,7 @@ class LiveKitInputTransport(BaseInputTransport):
await self._client.connect()
if not self._audio_in_task and self._params.audio_in_enabled:
self._audio_in_task = self.create_task(self._audio_in_task_handler())
await self.set_transport_ready(frame)
logger.info("LiveKitInputTransport started")
async def stop(self, frame: EndFrame):
@@ -441,6 +442,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._client.setup(frame)
await self._client.connect()
await self.set_transport_ready(frame)
logger.info("LiveKitOutputTransport started")
async def stop(self, frame: EndFrame):