diff --git a/CHANGELOG.md b/CHANGELOG.md index 54cbd3cd6..95350157f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an issue that could cause data to be sent to the transports when they + were still not ready. + - Remove custom audio tracks from `DailyTransport` before leaving. ## [0.0.66] - 2025-05-02 diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 51ebdb677..f9a27a6d3 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -122,6 +122,7 @@ class BaseInputTransport(FrameProcessor): # Configure VAD analyzer. if self._params.vad_analyzer: self._params.vad_analyzer.set_sample_rate(self._sample_rate) + # Configure End of turn analyzer. if self._params.turn_analyzer: self._params.turn_analyzer.set_sample_rate(self._sample_rate) @@ -129,10 +130,6 @@ class BaseInputTransport(FrameProcessor): # Start audio filter. if self._params.audio_in_filter: await self._params.audio_in_filter.start(self._sample_rate) - # Create audio input queue and task if needed. - if not self._audio_task and self._params.audio_in_enabled: - self._audio_in_queue = asyncio.Queue() - self._audio_task = self.create_task(self._audio_task_handler()) async def stop(self, frame: EndFrame): # Cancel and wait for the audio input task to finish. @@ -149,6 +146,13 @@ class BaseInputTransport(FrameProcessor): await self.cancel_task(self._audio_task) self._audio_task = None + async def set_transport_ready(self, frame: StartFrame): + """To be called when the transport is ready to stream.""" + # Create audio input queue and task if needed. + if not self._audio_task and self._params.audio_in_enabled: + self._audio_in_queue = asyncio.Queue() + self._audio_task = self.create_task(self._audio_task_handler()) + async def push_audio_frame(self, frame: InputAudioRawFrame): if self._params.audio_in_enabled: await self._audio_in_queue.put(frame) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index fa5d5e1c4..81492b84d 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -78,6 +78,16 @@ class BaseOutputTransport(FrameProcessor): audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2 self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks + async def stop(self, frame: EndFrame): + for _, sender in self._media_senders.items(): + await sender.stop(frame) + + async def cancel(self, frame: CancelFrame): + for _, sender in self._media_senders.items(): + await sender.cancel(frame) + + async def set_transport_ready(self, frame: StartFrame): + """To be called when the transport is ready to stream.""" # Register destinations. for destination in self._params.audio_out_destinations: await self.register_audio_destination(destination) @@ -112,14 +122,6 @@ class BaseOutputTransport(FrameProcessor): ) await self._media_senders[destination].start(frame) - async def stop(self, frame: EndFrame): - for _, sender in self._media_senders.items(): - await sender.stop(frame) - - async def cancel(self, frame: CancelFrame): - for _, sender in self._media_senders.items(): - await sender.cancel(frame) - async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): pass diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index 8bfd7ee34..ba554c9e3 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -61,6 +61,8 @@ class LocalAudioInputTransport(BaseInputTransport): ) self._in_stream.start_stream() + await self.set_transport_ready(frame) + async def cleanup(self): await super().cleanup() if self._in_stream: @@ -111,6 +113,8 @@ class LocalAudioOutputTransport(BaseOutputTransport): ) self._out_stream.start_stream() + await self.set_transport_ready(frame) + async def cleanup(self): await super().cleanup() if self._out_stream: diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index bed6371c2..4086497cb 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -68,6 +68,8 @@ class TkInputTransport(BaseInputTransport): ) self._in_stream.start_stream() + await self.set_transport_ready(frame) + async def cleanup(self): await super().cleanup() if self._in_stream: @@ -124,6 +126,8 @@ class TkOutputTransport(BaseOutputTransport): ) self._out_stream.start_stream() + await self.set_transport_ready(frame) + async def cleanup(self): await super().cleanup() if self._out_stream: diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 4a20bc49b..f04d56b0d 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -131,6 +131,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport): await self._client.trigger_client_connected() if not self._receive_task: self._receive_task = self.create_task(self._receive_messages()) + await self.set_transport_ready(frame) async def _stop_tasks(self): if self._monitor_websocket_task: @@ -204,6 +205,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport): await self._client.setup(frame) await self._params.serializer.setup(frame) self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index fdd501299..ffa3f441a 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -395,6 +395,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): self._receive_audio_task = self.create_task(self._receive_audio()) if not self._receive_video_task and self._params.video_in_enabled: self._receive_video_task = self.create_task(self._receive_video()) + await self.set_transport_ready(frame) async def _stop_tasks(self): if self._receive_audio_task: @@ -487,6 +488,7 @@ class SmallWebRTCOutputTransport(BaseOutputTransport): await super().start(frame) await self._client.setup(self._params, frame) await self._client.connect() + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py index 7e9725a76..535a0ab21 100644 --- a/src/pipecat/transports/network/websocket_client.py +++ b/src/pipecat/transports/network/websocket_client.py @@ -136,6 +136,7 @@ class WebsocketClientInputTransport(BaseInputTransport): await self._params.serializer.setup(frame) await self._session.setup(frame) await self._session.connect() + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) @@ -186,6 +187,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport): await self._params.serializer.setup(frame) await self._session.setup(frame) await self._session.connect() + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index b930f9fd6..7c8738871 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -83,6 +83,7 @@ class WebsocketServerInputTransport(BaseInputTransport): await self._params.serializer.setup(frame) if not self._server_task: self._server_task = self.create_task(self._server_task_handler()) + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) @@ -195,6 +196,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport): await super().start(frame) await self._params.serializer.setup(frame) self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 + await self.set_transport_ready(frame) async def stop(self, frame: EndFrame): await super().stop(frame) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 3e43ddee1..9909d9336 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -944,19 +944,23 @@ class DailyInputTransport(BaseInputTransport): self._audio_in_task = self.create_task(self._audio_in_task_handler()) async def start(self, frame: StartFrame): - # Setup client. - await self._client.setup(frame) - - # Parent start. - await super().start(frame) - if self._initialized: return self._initialized = True + # Parent start. + await super().start(frame) + + # Setup client. + await self._client.setup(frame) + # Join the room. await self._client.join() + + # Indicate the transport that we are connected. + await self.set_transport_ready(frame) + if self._params.audio_in_stream_on_start: self.start_audio_in_streaming() @@ -1125,20 +1129,23 @@ class DailyOutputTransport(BaseOutputTransport): self._initialized = False async def start(self, frame: StartFrame): - # Setup client. - await self._client.setup(frame) - - # Parent start. - await super().start(frame) - if self._initialized: return self._initialized = True + # Parent start. + await super().start(frame) + + # Setup client. + await self._client.setup(frame) + # Join the room. await self._client.join() + # Indicate the transport that we are connected. + await self.set_transport_ready(frame) + async def stop(self, frame: EndFrame): # Parent stop. await super().stop(frame) diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 456a70ea6..36cc5d604 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -370,6 +370,7 @@ class LiveKitInputTransport(BaseInputTransport): await self._client.connect() if not self._audio_in_task and self._params.audio_in_enabled: self._audio_in_task = self.create_task(self._audio_in_task_handler()) + await self.set_transport_ready(frame) logger.info("LiveKitInputTransport started") async def stop(self, frame: EndFrame): @@ -441,6 +442,7 @@ class LiveKitOutputTransport(BaseOutputTransport): await super().start(frame) await self._client.setup(frame) await self._client.connect() + await self.set_transport_ready(frame) logger.info("LiveKitOutputTransport started") async def stop(self, frame: EndFrame):