From 4ee6c4b59e624ffb5d9741d2d9e3f65b61aa2f18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 23 Apr 2025 14:20:35 -0700 Subject: [PATCH] BaseOutputTransport: reword camera with video --- src/pipecat/transports/base_output.py | 111 +++++++++--------- src/pipecat/transports/local/tk.py | 2 +- .../transports/network/small_webrtc.py | 6 +- src/pipecat/transports/services/daily.py | 6 +- 4 files changed, 62 insertions(+), 63 deletions(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 5369985d5..de53fac60 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -53,11 +53,10 @@ class BaseOutputTransport(FrameProcessor): self._sink_clock_task = None # Task to write/send audio and image frames. - self._camera_out_task = None + self._video_out_task = None - # These are the images that we should send to the camera at our desired - # framerate. - self._camera_images = None + # These are the images that we should send at our desired framerate. + self._video_images = None # Output sample rate. It will be initialized on StartFrame. self._sample_rate = 0 @@ -88,7 +87,7 @@ class BaseOutputTransport(FrameProcessor): # Start audio mixer. if self._params.audio_out_mixer: await self._params.audio_out_mixer.start(self._sample_rate) - self._create_camera_task() + self._create_video_task() self._create_sink_tasks() async def stop(self, frame: EndFrame): @@ -98,26 +97,26 @@ class BaseOutputTransport(FrameProcessor): # At this point we have enqueued an EndFrame and we need to wait for # that EndFrame to be processed by the sink tasks. We also need to wait - # for these tasks before cancelling the camera and audio tasks below + # for these tasks before cancelling the video and audio tasks below # because they might be still rendering. if self._sink_task: await self.wait_for_task(self._sink_task) if self._sink_clock_task: await self.wait_for_task(self._sink_clock_task) - # We can now cancel the camera task. - await self._cancel_camera_task() + # We can now cancel the video task. + await self._cancel_video_task() async def cancel(self, frame: CancelFrame): # Since we are cancelling everything it doesn't matter if we cancel sink # tasks first or not. await self._cancel_sink_tasks() - await self._cancel_camera_task() + await self._cancel_video_task() async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): pass - async def write_frame_to_camera(self, frame: OutputImageRawFrame): + async def write_raw_video_frame(self, frame: OutputImageRawFrame): pass async def write_raw_audio_frames(self, frames: bytes): @@ -181,11 +180,11 @@ class BaseOutputTransport(FrameProcessor): return if isinstance(frame, StartInterruptionFrame): - # Cancel sink and camera tasks. + # Cancel sink and video tasks. await self._cancel_sink_tasks() - await self._cancel_camera_task() - # Create sink and camera tasks. - self._create_camera_task() + await self._cancel_video_task() + # Create sink and video tasks. + self._create_video_task() self._create_sink_tasks() # Let's send a bot stopped speaking if we have to. await self._bot_stopped_speaking() @@ -216,7 +215,7 @@ class BaseOutputTransport(FrameProcessor): return if self._params.video_out_is_live: - await self._camera_out_queue.put(frame) + await self._video_out_queue.put(frame) else: await self._sink_queue.put(frame) @@ -261,9 +260,9 @@ class BaseOutputTransport(FrameProcessor): async def _sink_frame_handler(self, frame: Frame): if isinstance(frame, OutputImageRawFrame): - await self._set_camera_image(frame) + await self._set_video_image(frame) elif isinstance(frame, SpriteFrame): - await self._set_camera_images(frame.images) + await self._set_video_images(frame.images) elif isinstance(frame, TransportMessageFrame): await self.send_message(frame) @@ -368,20 +367,20 @@ class BaseOutputTransport(FrameProcessor): await self.write_raw_audio_frames(frame.audio) # - # Camera task + # Video task # - def _create_camera_task(self): - # Create camera output queue and task if needed. - if not self._camera_out_task and self._params.video_out_enabled: - self._camera_out_queue = asyncio.Queue() - self._camera_out_task = self.create_task(self._camera_out_task_handler()) + def _create_video_task(self): + # Create video output queue and task if needed. + if not self._video_out_task and self._params.video_out_enabled: + self._video_out_queue = asyncio.Queue() + self._video_out_task = self.create_task(self._video_out_task_handler()) - async def _cancel_camera_task(self): - # Stop camera output task. - if self._camera_out_task and self._params.video_out_enabled: - await self.cancel_task(self._camera_out_task) - self._camera_out_task = None + async def _cancel_video_task(self): + # Stop video output task. + if self._video_out_task and self._params.video_out_enabled: + await self.cancel_task(self._video_out_task) + self._video_out_task = None async def _draw_image(self, frame: OutputImageRawFrame): desired_size = (self._params.video_out_width, self._params.video_out_height) @@ -397,50 +396,50 @@ class BaseOutputTransport(FrameProcessor): resized_image.tobytes(), resized_image.size, resized_image.format ) - await self.write_frame_to_camera(frame) + await self.write_raw_video_frame(frame) - async def _set_camera_image(self, image: OutputImageRawFrame): - self._camera_images = itertools.cycle([image]) + async def _set_video_image(self, image: OutputImageRawFrame): + self._video_images = itertools.cycle([image]) - async def _set_camera_images(self, images: List[OutputImageRawFrame]): - self._camera_images = itertools.cycle(images) + async def _set_video_images(self, images: List[OutputImageRawFrame]): + self._video_images = itertools.cycle(images) - async def _camera_out_task_handler(self): - self._camera_out_start_time = None - self._camera_out_frame_index = 0 - self._camera_out_frame_duration = 1 / self._params.video_out_framerate - self._camera_out_frame_reset = self._camera_out_frame_duration * 5 + async def _video_out_task_handler(self): + self._video_out_start_time = None + self._video_out_frame_index = 0 + self._video_out_frame_duration = 1 / self._params.video_out_framerate + self._video_out_frame_reset = self._video_out_frame_duration * 5 while True: if self._params.video_out_is_live: - await self._camera_out_is_live_handler() - elif self._camera_images: - image = next(self._camera_images) + await self._video_out_is_live_handler() + elif self._video_images: + image = next(self._video_images) await self._draw_image(image) - await asyncio.sleep(self._camera_out_frame_duration) + await asyncio.sleep(self._video_out_frame_duration) else: - await asyncio.sleep(self._camera_out_frame_duration) + await asyncio.sleep(self._video_out_frame_duration) - async def _camera_out_is_live_handler(self): - image = await self._camera_out_queue.get() + async def _video_out_is_live_handler(self): + image = await self._video_out_queue.get() # We get the start time as soon as we get the first image. - if not self._camera_out_start_time: - self._camera_out_start_time = time.time() - self._camera_out_frame_index = 0 + if not self._video_out_start_time: + self._video_out_start_time = time.time() + self._video_out_frame_index = 0 # Calculate how much time we need to wait before rendering next image. - real_elapsed_time = time.time() - self._camera_out_start_time - real_render_time = self._camera_out_frame_index * self._camera_out_frame_duration - delay_time = self._camera_out_frame_duration + real_render_time - real_elapsed_time + real_elapsed_time = time.time() - self._video_out_start_time + real_render_time = self._video_out_frame_index * self._video_out_frame_duration + delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time - if abs(delay_time) > self._camera_out_frame_reset: - self._camera_out_start_time = time.time() - self._camera_out_frame_index = 0 + if abs(delay_time) > self._video_out_frame_reset: + self._video_out_start_time = time.time() + self._video_out_frame_index = 0 elif delay_time > 0: await asyncio.sleep(delay_time) - self._camera_out_frame_index += 1 + self._video_out_frame_index += 1 # Render image await self._draw_image(image) - self._camera_out_queue.task_done() + self._video_out_queue.task_done() diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 4338992e3..1695d2cab 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -137,7 +137,7 @@ class TkOutputTransport(BaseOutputTransport): self._executor, self._out_stream.write, frames ) - async def write_frame_to_camera(self, frame: OutputImageRawFrame): + async def write_raw_video_frame(self, frame: OutputImageRawFrame): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) def _write_frame_to_tk(self, frame: OutputImageRawFrame): diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index d95c06384..6304d8d59 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -288,7 +288,7 @@ class SmallWebRTCClient: if self._can_send() and self._audio_output_track: await self._audio_output_track.add_audio_bytes(data) - async def write_frame_to_camera(self, frame: OutputImageRawFrame): + async def write_raw_video_frame(self, frame: OutputImageRawFrame): if self._can_send() and self._video_output_track: self._video_output_track.add_video_frame(frame) @@ -500,8 +500,8 @@ class SmallWebRTCOutputTransport(BaseOutputTransport): async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) - async def write_frame_to_camera(self, frame: OutputImageRawFrame): - await self._client.write_frame_to_camera(frame) + async def write_raw_video_frame(self, frame: OutputImageRawFrame): + await self._client.write_raw_video_frame(frame) class SmallWebRTCTransport(BaseTransport): diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index d486a7aba..23aef56ac 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -373,7 +373,7 @@ class DailyTransportClient(EventHandler): self._mic.write_frames(frames, completion=completion_callback(future)) await future - async def write_frame_to_camera(self, frame: OutputImageRawFrame): + async def write_raw_video_frame(self, frame: OutputImageRawFrame): if not self._camera: return None @@ -1035,8 +1035,8 @@ class DailyOutputTransport(BaseOutputTransport): async def write_raw_audio_frames(self, frames: bytes): await self._client.write_raw_audio_frames(frames) - async def write_frame_to_camera(self, frame: OutputImageRawFrame): - await self._client.write_frame_to_camera(frame) + async def write_raw_video_frame(self, frame: OutputImageRawFrame): + await self._client.write_raw_video_frame(frame) class DailyTransport(BaseTransport):