BaseOutputTransport: allow sending audio/video to multiple destinations

This commit is contained in:
Aleix Conchillo Flaqué
2025-04-29 13:06:45 -07:00
parent ef3d732607
commit e738affd29
12 changed files with 495 additions and 329 deletions

View File

@@ -152,7 +152,7 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame):
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
return f"{self.name}(pts: {pts}, destination: {self.destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass

View File

@@ -66,6 +66,8 @@ class TTSService(AIService):
# Text filter executed after text has been aggregated.
text_filters: Sequence[BaseTextFilter] = [],
text_filter: Optional[BaseTextFilter] = None,
# Audio destination of the generated frames.
destination: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
@@ -82,6 +84,8 @@ class TTSService(AIService):
self._settings: Dict[str, Any] = {}
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
self._text_filters: Sequence[BaseTextFilter] = text_filters
self._destination: Optional[str] = destination
if text_filter:
import warnings
@@ -207,13 +211,16 @@ class TTSService(AIService):
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame):
silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit
await self.push_frame(
TTSAudioRawFrame(
audio=b"\x00" * silence_num_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
silence_frame = TTSAudioRawFrame(
audio=b"\x00" * silence_num_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
silence_frame.destination = self._destination
await self.push_frame(silence_frame)
if isinstance(frame, TTSAudioRawFrame):
frame.destination = self._destination
await super().push_frame(frame, direction)

View File

@@ -8,11 +8,12 @@ import asyncio
import itertools
import sys
import time
from typing import AsyncGenerator, List
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
from loguru import logger
from PIL import Image
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
BotSpeakingFrame,
@@ -46,80 +47,71 @@ class BaseOutputTransport(FrameProcessor):
self._params = params
# Task to process incoming frames so we don't block upstream elements.
self._sink_task = None
# Task to process incoming frames using a clock.
self._sink_clock_task = None
# Task to write/send audio and image frames.
self._video_out_task = None
# These are the images that we should send at our desired framerate.
self._video_images = None
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
self._resampler = create_default_resampler()
# Chunk size that will be written. It will be computed on StartFrame
self._audio_chunk_size = 0
self._audio_buffer = bytearray()
self._stopped_event = asyncio.Event()
# Indicates if the bot is currently speaking.
self._bot_speaking = False
# We will have one media sender per output frame destination. This allow
# us to send multiple streams at the same time if the transport allows
# it.
self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {}
@property
def sample_rate(self) -> int:
return self._sample_rate
sender = self._media_senders.get(None, None)
return sender.sample_rate if sender else 0
@property
def audio_chunk_size(self) -> int:
sender = self._media_senders.get(None, None)
return sender.audio_chunk_size if sender else 0
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
# Register destinations.
for destination in self._params.audio_out_destinations:
await self.register_audio_destination(destination)
# We will write 10ms*CHUNKS of audio at a time (where CHUNKS is the
# `audio_out_10ms_chunks` parameter). If we receive long audio frames we
# will chunk them. This will help with interruption handling.
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
for destination in self._params.video_out_destinations:
await self.register_video_destination(destination)
# Start audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.start(self._sample_rate)
self._create_video_task()
self._create_sink_tasks()
# Start default media sender.
self._media_senders[None] = BaseOutputTransport.MediaSender(
self, destination=None, sample_rate=self.sample_rate, params=self._params
)
await self._media_senders[None].start(frame)
# Media senders already send both audio and video, so make sure we only
# have one media server per shared name.
destinations = list(
set(self._params.audio_out_destinations + self._params.video_out_destinations)
)
# Start media senders.
for destination in destinations:
self._media_senders[destination] = BaseOutputTransport.MediaSender(
self, destination=destination, sample_rate=self.sample_rate, params=self._params
)
await self._media_senders[destination].start(frame)
async def stop(self, frame: EndFrame):
# Let the sink tasks process the queue until they reach this EndFrame.
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the video and audio tasks below
# because they might be still rendering.
if self._sink_task:
await self.wait_for_task(self._sink_task)
if self._sink_clock_task:
await self.wait_for_task(self._sink_clock_task)
# We can now cancel the video task.
await self._cancel_video_task()
for _, sender in self._media_senders.items():
await sender.stop(frame)
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
# tasks first or not.
await self._cancel_sink_tasks()
await self._cancel_video_task()
for _, sender in self._media_senders.items():
await sender.cancel(frame)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def register_video_destination(self, destination: str):
pass
async def write_raw_audio_frames(self, frames: bytes):
async def register_audio_destination(self, destination: str):
pass
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
pass
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
pass
async def send_audio(self, frame: OutputAudioRawFrame):
@@ -150,7 +142,7 @@ class BaseOutputTransport(FrameProcessor):
await self.push_frame(frame, direction)
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, SystemFrame):
@@ -160,117 +152,427 @@ class BaseOutputTransport(FrameProcessor):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
await self._params.audio_out_mixer.process_frame(frame)
elif isinstance(frame, MixerControlFrame):
await self._handle_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_audio(frame)
await self._handle_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await self._handle_image(frame)
await self._handle_frame(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
await self._handle_frame(frame)
elif direction == FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
else:
await self._sink_queue.put(frame)
await self._handle_frame(frame)
async def _handle_interruptions(self, frame: Frame):
if not self.interruptions_allowed:
async def _handle_frame(self, frame: Frame):
if frame.destination not in self._media_senders:
logger.warning(
f"{self} destination [{frame.destination}] not registered for frame {frame}"
)
return
sender = self._media_senders[frame.destination]
if isinstance(frame, StartInterruptionFrame):
# Cancel sink and video tasks.
await self._cancel_sink_tasks()
await self._cancel_video_task()
# Create sink and video tasks.
await sender.handle_interruptions(frame)
elif isinstance(frame, OutputAudioRawFrame):
await sender.handle_audio_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await sender.handle_image_frame(frame)
elif isinstance(frame, MixerControlFrame):
await sender.handle_mixer_control_frame(frame)
elif frame.pts:
await sender.handle_timed_frame(frame)
else:
await sender.handle_sync_frame(frame)
#
# Media Sender
#
class MediaSender:
def __init__(
self,
transport: "BaseOutputTransport",
*,
destination: Optional[str],
sample_rate: int,
params: TransportParams,
):
self._transport = transport
self._destination = destination
self._sample_rate = sample_rate
self._params = params
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
# We write 10ms*CHUNKS of audio at a time (where CHUNKS is the
# `audio_out_10ms_chunks` parameter). If we receive long audio
# frames we will chunk them. This helps with interruption handling.
self._audio_chunk_size = 0
self._audio_buffer = bytearray()
# This will be used to resample incoming audio to the output sample rate.
self._resampler = create_default_resampler()
# The user can provide a single mixer, to be used by the default
# destination, or a destination/mixer mapping.
self._mixer: Optional[BaseAudioMixer] = None
# These are the images that we should send at our desired framerate.
self._video_images = None
# Indicates if the bot is currently speaking.
self._bot_speaking = False
self._audio_task: Optional[asyncio.Task] = None
self._video_task: Optional[asyncio.Task] = None
self._clock_task: Optional[asyncio.Task] = None
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def audio_chunk_size(self) -> int:
return self._audio_chunk_size
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
# We will write 10ms*CHUNKS of audio at a time (where CHUNKS is the
# `audio_out_10ms_chunks` parameter). If we receive long audio frames we
# will chunk them. This will help with interruption handling.
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
self._audio_buffer = bytearray()
# Create all tasks.
self._create_video_task()
self._create_sink_tasks()
self._create_clock_task()
self._create_audio_task()
# Check if we have an audio mixer for our destination.
if self._params.audio_out_mixer:
if isinstance(self._params.audio_out_mixer, Mapping):
self._mixer = self._params.audio_out_mixer.get(self._destination, None)
elif not self._destination:
# Only use the default mixer if we are the default destination.
self._mixer = self._params.audio_out_mixer
# Start audio mixer.
if self._mixer:
await self._mixer.start(self._sample_rate)
async def stop(self, frame: EndFrame):
# Let the sink tasks process the queue until they reach this EndFrame.
await self._clock_queue.put((sys.maxsize, frame.id, frame))
await self._audio_queue.put(frame)
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the audio and clock tasks. We
# also need to wait for these tasks before cancelling the video task
# because it might be still rendering.
if self._audio_task:
await self._transport.wait_for_task(self._audio_task)
if self._clock_task:
await self._transport.wait_for_task(self._clock_task)
# Stop audio mixer.
if self._mixer:
await self._mixer.stop()
# We can now cancel the video task.
await self._cancel_video_task()
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter what task we cancel first.
await self._cancel_audio_task()
await self._cancel_clock_task()
await self._cancel_video_task()
async def handle_interruptions(self, _: StartInterruptionFrame):
if not self._transport.interruptions_allowed:
return
# Cancel tasks.
await self._cancel_audio_task()
await self._cancel_clock_task()
await self._cancel_video_task()
# Create tasks.
self._create_video_task()
self._create_clock_task()
self._create_audio_task()
# Let's send a bot stopped speaking if we have to.
await self._bot_stopped_speaking()
async def _handle_audio(self, frame: OutputAudioRawFrame):
if not self._params.audio_out_enabled:
return
async def handle_audio_frame(self, frame: OutputAudioRawFrame):
if not self._params.audio_out_enabled:
return
# We might need to resample if incoming audio doesn't match the
# transport sample rate.
resampled = await self._resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
cls = type(frame)
self._audio_buffer.extend(resampled)
while len(self._audio_buffer) >= self._audio_chunk_size:
chunk = cls(
bytes(self._audio_buffer[: self._audio_chunk_size]),
sample_rate=self._sample_rate,
num_channels=frame.num_channels,
# We might need to resample if incoming audio doesn't match the
# transport sample rate.
resampled = await self._resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
await self._sink_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
if not self._params.video_out_enabled:
return
cls = type(frame)
self._audio_buffer.extend(resampled)
while len(self._audio_buffer) >= self._audio_chunk_size:
chunk = cls(
bytes(self._audio_buffer[: self._audio_chunk_size]),
sample_rate=self._sample_rate,
num_channels=frame.num_channels,
)
await self._audio_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
if self._params.video_out_is_live:
await self._video_out_queue.put(frame)
else:
await self._sink_queue.put(frame)
async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame):
if not self._params.video_out_enabled:
return
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug("Bot started speaking")
await self.push_frame(BotStartedSpeakingFrame())
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = True
if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame):
await self._video_queue.put(frame)
elif isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
else:
await self._set_video_images(frame.images)
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug("Bot stopped speaking")
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def handle_timed_frame(self, frame: Frame):
await self._clock_queue.put((frame.pts, frame.id, frame))
#
# Sink tasks
#
async def handle_sync_frame(self, frame: Frame):
await self._audio_queue.put(frame)
def _create_sink_tasks(self):
if not self._sink_task:
self._sink_queue = asyncio.Queue()
self._sink_task = self.create_task(self._sink_task_handler())
if not self._sink_clock_task:
self._sink_clock_queue = asyncio.PriorityQueue()
self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
async def handle_mixer_control_frame(self, frame: MixerControlFrame):
if self._mixer:
await self._mixer.process_frame(frame)
async def _cancel_sink_tasks(self):
# Stop sink tasks.
if self._sink_task:
await self.cancel_task(self._sink_task)
self._sink_task = None
# Stop sink clock tasks.
if self._sink_clock_task:
await self.cancel_task(self._sink_clock_task)
self._sink_clock_task = None
#
# Audio handling
#
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
def _create_audio_task(self):
if not self._audio_task and self._params.audio_out_enabled:
self._audio_queue = asyncio.Queue()
self._audio_task = self._transport.create_task(self._audio_task_handler())
async def _sink_clock_task_handler(self):
running = True
while running:
try:
timestamp, _, frame = await self._sink_clock_queue.get()
async def _cancel_audio_task(self):
if self._audio_task:
await self._transport.cancel_task(self._audio_task)
self._audio_task = None
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug(f"Bot [{self._destination}] started speaking")
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = True
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug(f"Bot [{self._destination}] stopped speaking")
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def _handle_frame(self, frame: Frame):
if isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self._transport.send_message(frame)
def _next_frame(self) -> AsyncGenerator[Frame, None]:
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
yield frame
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size
while True:
try:
frame = self._audio_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._mixer.mix(silence),
sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame
if self._mixer:
return with_mixer(BOT_VAD_STOP_SECS)
else:
return without_mixer(BOT_VAD_STOP_SECS)
async def _audio_task_handler(self):
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(
BotSpeakingFrame(), FrameDirection.UPSTREAM
)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break
# Handle frame.
await self._handle_frame(frame)
# Also, push frame downstream in case anyone else needs it.
await self._transport.push_frame(frame)
# Send audio.
if isinstance(frame, OutputAudioRawFrame):
await self._transport.write_raw_audio_frames(frame.audio, self._destination)
#
# Video handling
#
def _create_video_task(self):
if not self._video_task and self._params.video_out_enabled:
self._video_queue = asyncio.Queue()
self._video_task = self._transport.create_task(self._video_task_handler())
async def _cancel_video_task(self):
# Stop video output task.
if self._video_task:
await self._transport.cancel_task(self._video_task)
self._video_task = None
async def _set_video_image(self, image: OutputImageRawFrame):
self._video_images = itertools.cycle([image])
async def _set_video_images(self, images: List[OutputImageRawFrame]):
self._video_images = itertools.cycle(images)
async def _video_task_handler(self):
self._video_start_time = None
self._video_frame_index = 0
self._video_frame_duration = 1 / self._params.video_out_framerate
self._video_frame_reset = self._video_frame_duration * 5
while True:
if self._params.video_out_is_live:
await self._video_is_live_handler()
elif self._video_images:
image = next(self._video_images)
await self._draw_image(image)
await asyncio.sleep(self._video_frame_duration)
else:
await asyncio.sleep(self._video_frame_duration)
async def _video_is_live_handler(self):
image = await self._video_queue.get()
# We get the start time as soon as we get the first image.
if not self._video_start_time:
self._video_start_time = time.time()
self._video_frame_index = 0
# Calculate how much time we need to wait before rendering next image.
real_elapsed_time = time.time() - self._video_start_time
real_render_time = self._video_frame_index * self._video_frame_duration
delay_time = self._video_frame_duration + real_render_time - real_elapsed_time
if abs(delay_time) > self._video_frame_reset:
self._video_start_time = time.time()
self._video_frame_index = 0
elif delay_time > 0:
await asyncio.sleep(delay_time)
self._video_frame_index += 1
# Render image
await self._draw_image(image)
self._video_queue.task_done()
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.video_out_width, self._params.video_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)
await self._transport.write_raw_video_frame(frame, self._destination)
#
# Clock handling
#
def _create_clock_task(self):
if not self._clock_task:
self._clock_queue = asyncio.PriorityQueue()
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):
if self._clock_task:
await self._transport.cancel_task(self._clock_task)
self._clock_task = None
async def _clock_task_handler(self):
running = True
while running:
timestamp, _, frame = await self._clock_queue.get()
# If we hit an EndFrame, we can finish right away.
running = not isinstance(frame, EndFrame)
@@ -279,167 +581,12 @@ class BaseOutputTransport(FrameProcessor):
# has already passed we process it, otherwise we wait until it's
# time to process it.
if running:
current_time = self.get_clock().get_time()
current_time = self._transport.get_clock().get_time()
if timestamp > current_time:
wait_time = nanoseconds_to_seconds(timestamp - current_time)
await asyncio.sleep(wait_time)
# Handle frame.
await self._sink_frame_handler(frame)
# Push frame downstream.
await self._transport.push_frame(frame)
# Also, push frame downstream in case anyone else needs it.
await self.push_frame(frame)
self._sink_clock_queue.task_done()
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(f"{self} error processing sink clock queue: {e}")
def _next_frame(self) -> AsyncGenerator[Frame, None]:
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
yield frame
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size
while True:
try:
frame = self._sink_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._params.audio_out_mixer.mix(silence),
sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame
if self._params.audio_out_mixer:
return with_mixer(BOT_VAD_STOP_SECS)
else:
return without_mixer(BOT_VAD_STOP_SECS)
async def _sink_task_handler(self):
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self.push_frame(BotSpeakingFrame())
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break
# Handle frame.
await self._sink_frame_handler(frame)
# Also, push frame downstream in case anyone else needs it.
await self.push_frame(frame)
# Send audio.
if isinstance(frame, OutputAudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
#
# Video task
#
def _create_video_task(self):
# Create video output queue and task if needed.
if not self._video_out_task and self._params.video_out_enabled:
self._video_out_queue = asyncio.Queue()
self._video_out_task = self.create_task(self._video_out_task_handler())
async def _cancel_video_task(self):
# Stop video output task.
if self._video_out_task and self._params.video_out_enabled:
await self.cancel_task(self._video_out_task)
self._video_out_task = None
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.video_out_width, self._params.video_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)
await self.write_raw_video_frame(frame)
async def _set_video_image(self, image: OutputImageRawFrame):
self._video_images = itertools.cycle([image])
async def _set_video_images(self, images: List[OutputImageRawFrame]):
self._video_images = itertools.cycle(images)
async def _video_out_task_handler(self):
self._video_out_start_time = None
self._video_out_frame_index = 0
self._video_out_frame_duration = 1 / self._params.video_out_framerate
self._video_out_frame_reset = self._video_out_frame_duration * 5
while True:
if self._params.video_out_is_live:
await self._video_out_is_live_handler()
elif self._video_images:
image = next(self._video_images)
await self._draw_image(image)
await asyncio.sleep(self._video_out_frame_duration)
else:
await asyncio.sleep(self._video_out_frame_duration)
async def _video_out_is_live_handler(self):
image = await self._video_out_queue.get()
# We get the start time as soon as we get the first image.
if not self._video_out_start_time:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
# Calculate how much time we need to wait before rendering next image.
real_elapsed_time = time.time() - self._video_out_start_time
real_render_time = self._video_out_frame_index * self._video_out_frame_duration
delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
if abs(delay_time) > self._video_out_frame_reset:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
elif delay_time > 0:
await asyncio.sleep(delay_time)
self._video_out_frame_index += 1
# Render image
await self._draw_image(image)
self._video_out_queue.task_done()
self._clock_queue.task_done()

View File

@@ -5,7 +5,7 @@
#
from abc import abstractmethod
from typing import Optional
from typing import List, Mapping, Optional
from pydantic import BaseModel, ConfigDict
@@ -33,7 +33,8 @@ class TransportParams(BaseModel):
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_out_10ms_chunks: int = 4
audio_out_mixer: Optional[BaseAudioMixer] = None
audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
audio_out_destinations: List[str] = []
audio_in_enabled: bool = False
audio_in_sample_rate: Optional[int] = None
audio_in_channels: int = 1
@@ -48,6 +49,7 @@ class TransportParams(BaseModel):
video_out_bitrate: int = 800000
video_out_framerate: int = 30
video_out_color_format: str = "RGB"
video_out_destinations: List[str] = []
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None

View File

@@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames

View File

@@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
def _write_frame_to_tk(self, frame: OutputImageRawFrame):

View File

@@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._client.setup(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._client.is_closing:
return

View File

@@ -284,11 +284,13 @@ class SmallWebRTCClient:
)
yield audio_frame
async def write_raw_audio_frames(self, data: bytes):
async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None):
if self._can_send() and self._audio_output_track:
await self._audio_output_track.add_audio_bytes(data)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
if self._can_send() and self._video_output_track:
self._video_output_track.add_video_frame(frame)
@@ -497,10 +499,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
await self._client.write_raw_audio_frames(frames)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
await self._client.write_raw_video_frame(frame)

View File

@@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
@@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self.sample_rate,

View File

@@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if not self._websocket:
# Simulate audio playback with a sleep.
await self._write_audio_sleep()

View File

@@ -369,7 +369,7 @@ class DailyTransportClient(EventHandler):
await asyncio.sleep(0.01)
return None
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if not self._mic:
return None
@@ -377,7 +377,9 @@ class DailyTransportClient(EventHandler):
self._mic.write_frames(frames, completion=completion_callback(future))
await future
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
if not self._camera:
return None
@@ -1076,10 +1078,12 @@ class DailyOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
await self._client.write_raw_audio_frames(frames)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
await self._client.write_raw_video_frame(frame)

View File

@@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
else:
await self._client.send_data(frame.message.encode())
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
await self._client.publish_audio(livekit_audio)