BaseOutputTransport: reword camera with video

This commit is contained in:
Aleix Conchillo Flaqué
2025-04-23 14:20:35 -07:00
parent e79a002e5a
commit 4ee6c4b59e
4 changed files with 62 additions and 63 deletions

View File

@@ -53,11 +53,10 @@ class BaseOutputTransport(FrameProcessor):
self._sink_clock_task = None
# Task to write/send audio and image frames.
self._camera_out_task = None
self._video_out_task = None
# These are the images that we should send to the camera at our desired
# framerate.
self._camera_images = None
# These are the images that we should send at our desired framerate.
self._video_images = None
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
@@ -88,7 +87,7 @@ class BaseOutputTransport(FrameProcessor):
# Start audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.start(self._sample_rate)
self._create_camera_task()
self._create_video_task()
self._create_sink_tasks()
async def stop(self, frame: EndFrame):
@@ -98,26 +97,26 @@ class BaseOutputTransport(FrameProcessor):
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the camera and audio tasks below
# for these tasks before cancelling the video and audio tasks below
# because they might be still rendering.
if self._sink_task:
await self.wait_for_task(self._sink_task)
if self._sink_clock_task:
await self.wait_for_task(self._sink_clock_task)
# We can now cancel the camera task.
await self._cancel_camera_task()
# We can now cancel the video task.
await self._cancel_video_task()
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
# tasks first or not.
await self._cancel_sink_tasks()
await self._cancel_camera_task()
await self._cancel_video_task()
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
pass
async def write_raw_audio_frames(self, frames: bytes):
@@ -181,11 +180,11 @@ class BaseOutputTransport(FrameProcessor):
return
if isinstance(frame, StartInterruptionFrame):
# Cancel sink and camera tasks.
# Cancel sink and video tasks.
await self._cancel_sink_tasks()
await self._cancel_camera_task()
# Create sink and camera tasks.
self._create_camera_task()
await self._cancel_video_task()
# Create sink and video tasks.
self._create_video_task()
self._create_sink_tasks()
# Let's send a bot stopped speaking if we have to.
await self._bot_stopped_speaking()
@@ -216,7 +215,7 @@ class BaseOutputTransport(FrameProcessor):
return
if self._params.video_out_is_live:
await self._camera_out_queue.put(frame)
await self._video_out_queue.put(frame)
else:
await self._sink_queue.put(frame)
@@ -261,9 +260,9 @@ class BaseOutputTransport(FrameProcessor):
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, OutputImageRawFrame):
await self._set_camera_image(frame)
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
@@ -368,20 +367,20 @@ class BaseOutputTransport(FrameProcessor):
await self.write_raw_audio_frames(frame.audio)
#
# Camera task
# Video task
#
def _create_camera_task(self):
# Create camera output queue and task if needed.
if not self._camera_out_task and self._params.video_out_enabled:
self._camera_out_queue = asyncio.Queue()
self._camera_out_task = self.create_task(self._camera_out_task_handler())
def _create_video_task(self):
# Create video output queue and task if needed.
if not self._video_out_task and self._params.video_out_enabled:
self._video_out_queue = asyncio.Queue()
self._video_out_task = self.create_task(self._video_out_task_handler())
async def _cancel_camera_task(self):
# Stop camera output task.
if self._camera_out_task and self._params.video_out_enabled:
await self.cancel_task(self._camera_out_task)
self._camera_out_task = None
async def _cancel_video_task(self):
# Stop video output task.
if self._video_out_task and self._params.video_out_enabled:
await self.cancel_task(self._video_out_task)
self._video_out_task = None
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.video_out_width, self._params.video_out_height)
@@ -397,50 +396,50 @@ class BaseOutputTransport(FrameProcessor):
resized_image.tobytes(), resized_image.size, resized_image.format
)
await self.write_frame_to_camera(frame)
await self.write_raw_video_frame(frame)
async def _set_camera_image(self, image: OutputImageRawFrame):
self._camera_images = itertools.cycle([image])
async def _set_video_image(self, image: OutputImageRawFrame):
self._video_images = itertools.cycle([image])
async def _set_camera_images(self, images: List[OutputImageRawFrame]):
self._camera_images = itertools.cycle(images)
async def _set_video_images(self, images: List[OutputImageRawFrame]):
self._video_images = itertools.cycle(images)
async def _camera_out_task_handler(self):
self._camera_out_start_time = None
self._camera_out_frame_index = 0
self._camera_out_frame_duration = 1 / self._params.video_out_framerate
self._camera_out_frame_reset = self._camera_out_frame_duration * 5
async def _video_out_task_handler(self):
self._video_out_start_time = None
self._video_out_frame_index = 0
self._video_out_frame_duration = 1 / self._params.video_out_framerate
self._video_out_frame_reset = self._video_out_frame_duration * 5
while True:
if self._params.video_out_is_live:
await self._camera_out_is_live_handler()
elif self._camera_images:
image = next(self._camera_images)
await self._video_out_is_live_handler()
elif self._video_images:
image = next(self._video_images)
await self._draw_image(image)
await asyncio.sleep(self._camera_out_frame_duration)
await asyncio.sleep(self._video_out_frame_duration)
else:
await asyncio.sleep(self._camera_out_frame_duration)
await asyncio.sleep(self._video_out_frame_duration)
async def _camera_out_is_live_handler(self):
image = await self._camera_out_queue.get()
async def _video_out_is_live_handler(self):
image = await self._video_out_queue.get()
# We get the start time as soon as we get the first image.
if not self._camera_out_start_time:
self._camera_out_start_time = time.time()
self._camera_out_frame_index = 0
if not self._video_out_start_time:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
# Calculate how much time we need to wait before rendering next image.
real_elapsed_time = time.time() - self._camera_out_start_time
real_render_time = self._camera_out_frame_index * self._camera_out_frame_duration
delay_time = self._camera_out_frame_duration + real_render_time - real_elapsed_time
real_elapsed_time = time.time() - self._video_out_start_time
real_render_time = self._video_out_frame_index * self._video_out_frame_duration
delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
if abs(delay_time) > self._camera_out_frame_reset:
self._camera_out_start_time = time.time()
self._camera_out_frame_index = 0
if abs(delay_time) > self._video_out_frame_reset:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
elif delay_time > 0:
await asyncio.sleep(delay_time)
self._camera_out_frame_index += 1
self._video_out_frame_index += 1
# Render image
await self._draw_image(image)
self._camera_out_queue.task_done()
self._video_out_queue.task_done()

View File

@@ -137,7 +137,7 @@ class TkOutputTransport(BaseOutputTransport):
self._executor, self._out_stream.write, frames
)
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
def _write_frame_to_tk(self, frame: OutputImageRawFrame):

View File

@@ -288,7 +288,7 @@ class SmallWebRTCClient:
if self._can_send() and self._audio_output_track:
await self._audio_output_track.add_audio_bytes(data)
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
if self._can_send() and self._video_output_track:
self._video_output_track.add_video_frame(frame)
@@ -500,8 +500,8 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
await self._client.write_frame_to_camera(frame)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
await self._client.write_raw_video_frame(frame)
class SmallWebRTCTransport(BaseTransport):

View File

@@ -373,7 +373,7 @@ class DailyTransportClient(EventHandler):
self._mic.write_frames(frames, completion=completion_callback(future))
await future
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
if not self._camera:
return None
@@ -1035,8 +1035,8 @@ class DailyOutputTransport(BaseOutputTransport):
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
await self._client.write_frame_to_camera(frame)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
await self._client.write_raw_video_frame(frame)
class DailyTransport(BaseTransport):