diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index f7c805e83..b08506818 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -152,7 +152,7 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame): def __str__(self): pts = format_pts(self.pts) - return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" + return f"{self.name}(pts: {pts}, destination: {self.destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})" @dataclass diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index a2b8e3f90..1ae7eb079 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -66,6 +66,8 @@ class TTSService(AIService): # Text filter executed after text has been aggregated. text_filters: Sequence[BaseTextFilter] = [], text_filter: Optional[BaseTextFilter] = None, + # Audio destination of the generated frames. + destination: Optional[str] = None, **kwargs, ): super().__init__(**kwargs) @@ -82,6 +84,8 @@ class TTSService(AIService): self._settings: Dict[str, Any] = {} self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator() self._text_filters: Sequence[BaseTextFilter] = text_filters + self._destination: Optional[str] = destination + if text_filter: import warnings @@ -207,13 +211,16 @@ class TTSService(AIService): async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame): silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit - await self.push_frame( - TTSAudioRawFrame( - audio=b"\x00" * silence_num_bytes, - sample_rate=self.sample_rate, - num_channels=1, - ) + silence_frame = TTSAudioRawFrame( + audio=b"\x00" * silence_num_bytes, + sample_rate=self.sample_rate, + num_channels=1, ) + silence_frame.destination = self._destination + await self.push_frame(silence_frame) + + if isinstance(frame, TTSAudioRawFrame): + frame.destination = self._destination await super().push_frame(frame, direction) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index de53fac60..a4cd4af1b 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -8,11 +8,12 @@ import asyncio import itertools import sys import time -from typing import AsyncGenerator, List +from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional from loguru import logger from PIL import Image +from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer from pipecat.audio.utils import create_default_resampler from pipecat.frames.frames import ( BotSpeakingFrame, @@ -46,80 +47,71 @@ class BaseOutputTransport(FrameProcessor): self._params = params - # Task to process incoming frames so we don't block upstream elements. - self._sink_task = None - - # Task to process incoming frames using a clock. - self._sink_clock_task = None - - # Task to write/send audio and image frames. - self._video_out_task = None - - # These are the images that we should send at our desired framerate. - self._video_images = None - - # Output sample rate. It will be initialized on StartFrame. - self._sample_rate = 0 - self._resampler = create_default_resampler() - - # Chunk size that will be written. It will be computed on StartFrame - self._audio_chunk_size = 0 - self._audio_buffer = bytearray() - - self._stopped_event = asyncio.Event() - - # Indicates if the bot is currently speaking. - self._bot_speaking = False + # We will have one media sender per output frame destination. This allow + # us to send multiple streams at the same time if the transport allows + # it. + self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {} @property def sample_rate(self) -> int: - return self._sample_rate + sender = self._media_senders.get(None, None) + return sender.sample_rate if sender else 0 + + @property + def audio_chunk_size(self) -> int: + sender = self._media_senders.get(None, None) + return sender.audio_chunk_size if sender else 0 async def start(self, frame: StartFrame): - self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate + # Register destinations. + for destination in self._params.audio_out_destinations: + await self.register_audio_destination(destination) - # We will write 10ms*CHUNKS of audio at a time (where CHUNKS is the - # `audio_out_10ms_chunks` parameter). If we receive long audio frames we - # will chunk them. This will help with interruption handling. - audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2 - self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks + for destination in self._params.video_out_destinations: + await self.register_video_destination(destination) - # Start audio mixer. - if self._params.audio_out_mixer: - await self._params.audio_out_mixer.start(self._sample_rate) - self._create_video_task() - self._create_sink_tasks() + # Start default media sender. + self._media_senders[None] = BaseOutputTransport.MediaSender( + self, destination=None, sample_rate=self.sample_rate, params=self._params + ) + await self._media_senders[None].start(frame) + + # Media senders already send both audio and video, so make sure we only + # have one media server per shared name. + destinations = list( + set(self._params.audio_out_destinations + self._params.video_out_destinations) + ) + + # Start media senders. + for destination in destinations: + self._media_senders[destination] = BaseOutputTransport.MediaSender( + self, destination=destination, sample_rate=self.sample_rate, params=self._params + ) + await self._media_senders[destination].start(frame) async def stop(self, frame: EndFrame): - # Let the sink tasks process the queue until they reach this EndFrame. - await self._sink_clock_queue.put((sys.maxsize, frame.id, frame)) - await self._sink_queue.put(frame) - - # At this point we have enqueued an EndFrame and we need to wait for - # that EndFrame to be processed by the sink tasks. We also need to wait - # for these tasks before cancelling the video and audio tasks below - # because they might be still rendering. - if self._sink_task: - await self.wait_for_task(self._sink_task) - if self._sink_clock_task: - await self.wait_for_task(self._sink_clock_task) - - # We can now cancel the video task. - await self._cancel_video_task() + for _, sender in self._media_senders.items(): + await sender.stop(frame) async def cancel(self, frame: CancelFrame): - # Since we are cancelling everything it doesn't matter if we cancel sink - # tasks first or not. - await self._cancel_sink_tasks() - await self._cancel_video_task() + for _, sender in self._media_senders.items(): + await sender.cancel(frame) async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): pass - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def register_video_destination(self, destination: str): pass - async def write_raw_audio_frames(self, frames: bytes): + async def register_audio_destination(self, destination: str): + pass + + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): + pass + + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): pass async def send_audio(self, frame: OutputAudioRawFrame): @@ -150,7 +142,7 @@ class BaseOutputTransport(FrameProcessor): await self.push_frame(frame, direction) elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)): await self.push_frame(frame, direction) - await self._handle_interruptions(frame) + await self._handle_frame(frame) elif isinstance(frame, TransportMessageUrgentFrame): await self.send_message(frame) elif isinstance(frame, SystemFrame): @@ -160,117 +152,427 @@ class BaseOutputTransport(FrameProcessor): await self.stop(frame) # Keep pushing EndFrame down so all the pipeline stops nicely. await self.push_frame(frame, direction) - elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer: - await self._params.audio_out_mixer.process_frame(frame) + elif isinstance(frame, MixerControlFrame): + await self._handle_frame(frame) # Other frames. elif isinstance(frame, OutputAudioRawFrame): - await self._handle_audio(frame) + await self._handle_frame(frame) elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)): - await self._handle_image(frame) + await self._handle_frame(frame) # TODO(aleix): Images and audio should support presentation timestamps. elif frame.pts: - await self._sink_clock_queue.put((frame.pts, frame.id, frame)) + await self._handle_frame(frame) elif direction == FrameDirection.UPSTREAM: await self.push_frame(frame, direction) else: - await self._sink_queue.put(frame) + await self._handle_frame(frame) - async def _handle_interruptions(self, frame: Frame): - if not self.interruptions_allowed: + async def _handle_frame(self, frame: Frame): + if frame.destination not in self._media_senders: + logger.warning( + f"{self} destination [{frame.destination}] not registered for frame {frame}" + ) return + sender = self._media_senders[frame.destination] + if isinstance(frame, StartInterruptionFrame): - # Cancel sink and video tasks. - await self._cancel_sink_tasks() - await self._cancel_video_task() - # Create sink and video tasks. + await sender.handle_interruptions(frame) + elif isinstance(frame, OutputAudioRawFrame): + await sender.handle_audio_frame(frame) + elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)): + await sender.handle_image_frame(frame) + elif isinstance(frame, MixerControlFrame): + await sender.handle_mixer_control_frame(frame) + elif frame.pts: + await sender.handle_timed_frame(frame) + else: + await sender.handle_sync_frame(frame) + + # + # Media Sender + # + + class MediaSender: + def __init__( + self, + transport: "BaseOutputTransport", + *, + destination: Optional[str], + sample_rate: int, + params: TransportParams, + ): + self._transport = transport + self._destination = destination + self._sample_rate = sample_rate + self._params = params + + # Output sample rate. It will be initialized on StartFrame. + self._sample_rate = 0 + + # We write 10ms*CHUNKS of audio at a time (where CHUNKS is the + # `audio_out_10ms_chunks` parameter). If we receive long audio + # frames we will chunk them. This helps with interruption handling. + self._audio_chunk_size = 0 + self._audio_buffer = bytearray() + + # This will be used to resample incoming audio to the output sample rate. + self._resampler = create_default_resampler() + + # The user can provide a single mixer, to be used by the default + # destination, or a destination/mixer mapping. + self._mixer: Optional[BaseAudioMixer] = None + + # These are the images that we should send at our desired framerate. + self._video_images = None + + # Indicates if the bot is currently speaking. + self._bot_speaking = False + + self._audio_task: Optional[asyncio.Task] = None + self._video_task: Optional[asyncio.Task] = None + self._clock_task: Optional[asyncio.Task] = None + + @property + def sample_rate(self) -> int: + return self._sample_rate + + @property + def audio_chunk_size(self) -> int: + return self._audio_chunk_size + + async def start(self, frame: StartFrame): + self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate + + # We will write 10ms*CHUNKS of audio at a time (where CHUNKS is the + # `audio_out_10ms_chunks` parameter). If we receive long audio frames we + # will chunk them. This will help with interruption handling. + audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2 + self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks + self._audio_buffer = bytearray() + + # Create all tasks. self._create_video_task() - self._create_sink_tasks() + self._create_clock_task() + self._create_audio_task() + + # Check if we have an audio mixer for our destination. + if self._params.audio_out_mixer: + if isinstance(self._params.audio_out_mixer, Mapping): + self._mixer = self._params.audio_out_mixer.get(self._destination, None) + elif not self._destination: + # Only use the default mixer if we are the default destination. + self._mixer = self._params.audio_out_mixer + + # Start audio mixer. + if self._mixer: + await self._mixer.start(self._sample_rate) + + async def stop(self, frame: EndFrame): + # Let the sink tasks process the queue until they reach this EndFrame. + await self._clock_queue.put((sys.maxsize, frame.id, frame)) + await self._audio_queue.put(frame) + + # At this point we have enqueued an EndFrame and we need to wait for + # that EndFrame to be processed by the audio and clock tasks. We + # also need to wait for these tasks before cancelling the video task + # because it might be still rendering. + if self._audio_task: + await self._transport.wait_for_task(self._audio_task) + if self._clock_task: + await self._transport.wait_for_task(self._clock_task) + + # Stop audio mixer. + if self._mixer: + await self._mixer.stop() + + # We can now cancel the video task. + await self._cancel_video_task() + + async def cancel(self, frame: CancelFrame): + # Since we are cancelling everything it doesn't matter what task we cancel first. + await self._cancel_audio_task() + await self._cancel_clock_task() + await self._cancel_video_task() + + async def handle_interruptions(self, _: StartInterruptionFrame): + if not self._transport.interruptions_allowed: + return + + # Cancel tasks. + await self._cancel_audio_task() + await self._cancel_clock_task() + await self._cancel_video_task() + # Create tasks. + self._create_video_task() + self._create_clock_task() + self._create_audio_task() # Let's send a bot stopped speaking if we have to. await self._bot_stopped_speaking() - async def _handle_audio(self, frame: OutputAudioRawFrame): - if not self._params.audio_out_enabled: - return + async def handle_audio_frame(self, frame: OutputAudioRawFrame): + if not self._params.audio_out_enabled: + return - # We might need to resample if incoming audio doesn't match the - # transport sample rate. - resampled = await self._resampler.resample( - frame.audio, frame.sample_rate, self._sample_rate - ) - - cls = type(frame) - self._audio_buffer.extend(resampled) - while len(self._audio_buffer) >= self._audio_chunk_size: - chunk = cls( - bytes(self._audio_buffer[: self._audio_chunk_size]), - sample_rate=self._sample_rate, - num_channels=frame.num_channels, + # We might need to resample if incoming audio doesn't match the + # transport sample rate. + resampled = await self._resampler.resample( + frame.audio, frame.sample_rate, self._sample_rate ) - await self._sink_queue.put(chunk) - self._audio_buffer = self._audio_buffer[self._audio_chunk_size :] - async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame): - if not self._params.video_out_enabled: - return + cls = type(frame) + self._audio_buffer.extend(resampled) + while len(self._audio_buffer) >= self._audio_chunk_size: + chunk = cls( + bytes(self._audio_buffer[: self._audio_chunk_size]), + sample_rate=self._sample_rate, + num_channels=frame.num_channels, + ) + await self._audio_queue.put(chunk) + self._audio_buffer = self._audio_buffer[self._audio_chunk_size :] - if self._params.video_out_is_live: - await self._video_out_queue.put(frame) - else: - await self._sink_queue.put(frame) + async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame): + if not self._params.video_out_enabled: + return - async def _bot_started_speaking(self): - if not self._bot_speaking: - logger.debug("Bot started speaking") - await self.push_frame(BotStartedSpeakingFrame()) - await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) - self._bot_speaking = True + if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame): + await self._video_queue.put(frame) + elif isinstance(frame, OutputImageRawFrame): + await self._set_video_image(frame) + else: + await self._set_video_images(frame.images) - async def _bot_stopped_speaking(self): - if self._bot_speaking: - logger.debug("Bot stopped speaking") - await self.push_frame(BotStoppedSpeakingFrame()) - await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - self._bot_speaking = False - # Clean audio buffer (there could be tiny left overs if not multiple - # to our output chunk size). - self._audio_buffer = bytearray() + async def handle_timed_frame(self, frame: Frame): + await self._clock_queue.put((frame.pts, frame.id, frame)) - # - # Sink tasks - # + async def handle_sync_frame(self, frame: Frame): + await self._audio_queue.put(frame) - def _create_sink_tasks(self): - if not self._sink_task: - self._sink_queue = asyncio.Queue() - self._sink_task = self.create_task(self._sink_task_handler()) - if not self._sink_clock_task: - self._sink_clock_queue = asyncio.PriorityQueue() - self._sink_clock_task = self.create_task(self._sink_clock_task_handler()) + async def handle_mixer_control_frame(self, frame: MixerControlFrame): + if self._mixer: + await self._mixer.process_frame(frame) - async def _cancel_sink_tasks(self): - # Stop sink tasks. - if self._sink_task: - await self.cancel_task(self._sink_task) - self._sink_task = None - # Stop sink clock tasks. - if self._sink_clock_task: - await self.cancel_task(self._sink_clock_task) - self._sink_clock_task = None + # + # Audio handling + # - async def _sink_frame_handler(self, frame: Frame): - if isinstance(frame, OutputImageRawFrame): - await self._set_video_image(frame) - elif isinstance(frame, SpriteFrame): - await self._set_video_images(frame.images) - elif isinstance(frame, TransportMessageFrame): - await self.send_message(frame) + def _create_audio_task(self): + if not self._audio_task and self._params.audio_out_enabled: + self._audio_queue = asyncio.Queue() + self._audio_task = self._transport.create_task(self._audio_task_handler()) - async def _sink_clock_task_handler(self): - running = True - while running: - try: - timestamp, _, frame = await self._sink_clock_queue.get() + async def _cancel_audio_task(self): + if self._audio_task: + await self._transport.cancel_task(self._audio_task) + self._audio_task = None + + async def _bot_started_speaking(self): + if not self._bot_speaking: + logger.debug(f"Bot [{self._destination}] started speaking") + + downstream_frame = BotStartedSpeakingFrame() + downstream_frame.destination = self._destination + upstream_frame = BotStartedSpeakingFrame() + upstream_frame.destination = self._destination + await self._transport.push_frame(downstream_frame) + await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM) + + self._bot_speaking = True + + async def _bot_stopped_speaking(self): + if self._bot_speaking: + logger.debug(f"Bot [{self._destination}] stopped speaking") + + downstream_frame = BotStoppedSpeakingFrame() + downstream_frame.destination = self._destination + upstream_frame = BotStoppedSpeakingFrame() + upstream_frame.destination = self._destination + await self._transport.push_frame(downstream_frame) + await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM) + + self._bot_speaking = False + + # Clean audio buffer (there could be tiny left overs if not multiple + # to our output chunk size). + self._audio_buffer = bytearray() + + async def _handle_frame(self, frame: Frame): + if isinstance(frame, OutputImageRawFrame): + await self._set_video_image(frame) + elif isinstance(frame, SpriteFrame): + await self._set_video_images(frame.images) + elif isinstance(frame, TransportMessageFrame): + await self._transport.send_message(frame) + + def _next_frame(self) -> AsyncGenerator[Frame, None]: + async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + while True: + try: + frame = await asyncio.wait_for( + self._audio_queue.get(), timeout=vad_stop_secs + ) + yield frame + except asyncio.TimeoutError: + # Notify the bot stopped speaking upstream if necessary. + await self._bot_stopped_speaking() + + async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + last_frame_time = 0 + silence = b"\x00" * self._audio_chunk_size + while True: + try: + frame = self._audio_queue.get_nowait() + if isinstance(frame, OutputAudioRawFrame): + frame.audio = await self._mixer.mix(frame.audio) + last_frame_time = time.time() + yield frame + except asyncio.QueueEmpty: + # Notify the bot stopped speaking upstream if necessary. + diff_time = time.time() - last_frame_time + if diff_time > vad_stop_secs: + await self._bot_stopped_speaking() + # Generate an audio frame with only the mixer's part. + frame = OutputAudioRawFrame( + audio=await self._mixer.mix(silence), + sample_rate=self._sample_rate, + num_channels=self._params.audio_out_channels, + ) + yield frame + + if self._mixer: + return with_mixer(BOT_VAD_STOP_SECS) + else: + return without_mixer(BOT_VAD_STOP_SECS) + + async def _audio_task_handler(self): + # Push a BotSpeakingFrame every 200ms, we don't really need to push it + # at every audio chunk. If the audio chunk is bigger than 200ms, push at + # every audio chunk. + TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10 + BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1) + bot_speaking_counter = 0 + async for frame in self._next_frame(): + # Notify the bot started speaking upstream if necessary and that + # it's actually speaking. + if isinstance(frame, TTSAudioRawFrame): + await self._bot_started_speaking() + if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0: + await self._transport.push_frame(BotSpeakingFrame()) + await self._transport.push_frame( + BotSpeakingFrame(), FrameDirection.UPSTREAM + ) + bot_speaking_counter = 0 + bot_speaking_counter += 1 + + # No need to push EndFrame, it's pushed from process_frame(). + if isinstance(frame, EndFrame): + break + + # Handle frame. + await self._handle_frame(frame) + + # Also, push frame downstream in case anyone else needs it. + await self._transport.push_frame(frame) + + # Send audio. + if isinstance(frame, OutputAudioRawFrame): + await self._transport.write_raw_audio_frames(frame.audio, self._destination) + + # + # Video handling + # + + def _create_video_task(self): + if not self._video_task and self._params.video_out_enabled: + self._video_queue = asyncio.Queue() + self._video_task = self._transport.create_task(self._video_task_handler()) + + async def _cancel_video_task(self): + # Stop video output task. + if self._video_task: + await self._transport.cancel_task(self._video_task) + self._video_task = None + + async def _set_video_image(self, image: OutputImageRawFrame): + self._video_images = itertools.cycle([image]) + + async def _set_video_images(self, images: List[OutputImageRawFrame]): + self._video_images = itertools.cycle(images) + + async def _video_task_handler(self): + self._video_start_time = None + self._video_frame_index = 0 + self._video_frame_duration = 1 / self._params.video_out_framerate + self._video_frame_reset = self._video_frame_duration * 5 + while True: + if self._params.video_out_is_live: + await self._video_is_live_handler() + elif self._video_images: + image = next(self._video_images) + await self._draw_image(image) + await asyncio.sleep(self._video_frame_duration) + else: + await asyncio.sleep(self._video_frame_duration) + + async def _video_is_live_handler(self): + image = await self._video_queue.get() + + # We get the start time as soon as we get the first image. + if not self._video_start_time: + self._video_start_time = time.time() + self._video_frame_index = 0 + + # Calculate how much time we need to wait before rendering next image. + real_elapsed_time = time.time() - self._video_start_time + real_render_time = self._video_frame_index * self._video_frame_duration + delay_time = self._video_frame_duration + real_render_time - real_elapsed_time + + if abs(delay_time) > self._video_frame_reset: + self._video_start_time = time.time() + self._video_frame_index = 0 + elif delay_time > 0: + await asyncio.sleep(delay_time) + self._video_frame_index += 1 + + # Render image + await self._draw_image(image) + + self._video_queue.task_done() + + async def _draw_image(self, frame: OutputImageRawFrame): + desired_size = (self._params.video_out_width, self._params.video_out_height) + + # TODO: we should refactor in the future to support dynamic resolutions + # which is kind of what happens in P2P connections. + # We need to add support for that inside the DailyTransport + if frame.size != desired_size: + image = Image.frombytes(frame.format, frame.size, frame.image) + resized_image = image.resize(desired_size) + # logger.warning(f"{frame} does not have the expected size {desired_size}, resizing") + frame = OutputImageRawFrame( + resized_image.tobytes(), resized_image.size, resized_image.format + ) + + await self._transport.write_raw_video_frame(frame, self._destination) + + # + # Clock handling + # + + def _create_clock_task(self): + if not self._clock_task: + self._clock_queue = asyncio.PriorityQueue() + self._clock_task = self._transport.create_task(self._clock_task_handler()) + + async def _cancel_clock_task(self): + if self._clock_task: + await self._transport.cancel_task(self._clock_task) + self._clock_task = None + + async def _clock_task_handler(self): + running = True + while running: + timestamp, _, frame = await self._clock_queue.get() # If we hit an EndFrame, we can finish right away. running = not isinstance(frame, EndFrame) @@ -279,167 +581,12 @@ class BaseOutputTransport(FrameProcessor): # has already passed we process it, otherwise we wait until it's # time to process it. if running: - current_time = self.get_clock().get_time() + current_time = self._transport.get_clock().get_time() if timestamp > current_time: wait_time = nanoseconds_to_seconds(timestamp - current_time) await asyncio.sleep(wait_time) - # Handle frame. - await self._sink_frame_handler(frame) + # Push frame downstream. + await self._transport.push_frame(frame) - # Also, push frame downstream in case anyone else needs it. - await self.push_frame(frame) - - self._sink_clock_queue.task_done() - except asyncio.CancelledError: - raise - except Exception as e: - logger.exception(f"{self} error processing sink clock queue: {e}") - - def _next_frame(self) -> AsyncGenerator[Frame, None]: - async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: - while True: - try: - frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs) - yield frame - except asyncio.TimeoutError: - # Notify the bot stopped speaking upstream if necessary. - await self._bot_stopped_speaking() - - async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: - last_frame_time = 0 - silence = b"\x00" * self._audio_chunk_size - while True: - try: - frame = self._sink_queue.get_nowait() - if isinstance(frame, OutputAudioRawFrame): - frame.audio = await self._params.audio_out_mixer.mix(frame.audio) - last_frame_time = time.time() - yield frame - except asyncio.QueueEmpty: - # Notify the bot stopped speaking upstream if necessary. - diff_time = time.time() - last_frame_time - if diff_time > vad_stop_secs: - await self._bot_stopped_speaking() - # Generate an audio frame with only the mixer's part. - frame = OutputAudioRawFrame( - audio=await self._params.audio_out_mixer.mix(silence), - sample_rate=self._sample_rate, - num_channels=self._params.audio_out_channels, - ) - yield frame - - if self._params.audio_out_mixer: - return with_mixer(BOT_VAD_STOP_SECS) - else: - return without_mixer(BOT_VAD_STOP_SECS) - - async def _sink_task_handler(self): - # Push a BotSpeakingFrame every 200ms, we don't really need to push it - # at every audio chunk. If the audio chunk is bigger than 200ms, push at - # every audio chunk. - TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10 - BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1) - bot_speaking_counter = 0 - async for frame in self._next_frame(): - # Notify the bot started speaking upstream if necessary and that - # it's actually speaking. - if isinstance(frame, TTSAudioRawFrame): - await self._bot_started_speaking() - if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0: - await self.push_frame(BotSpeakingFrame()) - await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) - bot_speaking_counter = 0 - bot_speaking_counter += 1 - - # No need to push EndFrame, it's pushed from process_frame(). - if isinstance(frame, EndFrame): - break - - # Handle frame. - await self._sink_frame_handler(frame) - - # Also, push frame downstream in case anyone else needs it. - await self.push_frame(frame) - - # Send audio. - if isinstance(frame, OutputAudioRawFrame): - await self.write_raw_audio_frames(frame.audio) - - # - # Video task - # - - def _create_video_task(self): - # Create video output queue and task if needed. - if not self._video_out_task and self._params.video_out_enabled: - self._video_out_queue = asyncio.Queue() - self._video_out_task = self.create_task(self._video_out_task_handler()) - - async def _cancel_video_task(self): - # Stop video output task. - if self._video_out_task and self._params.video_out_enabled: - await self.cancel_task(self._video_out_task) - self._video_out_task = None - - async def _draw_image(self, frame: OutputImageRawFrame): - desired_size = (self._params.video_out_width, self._params.video_out_height) - - # TODO: we should refactor in the future to support dynamic resolutions - # which is kind of what happens in P2P connections. - # We need to add support for that inside the DailyTransport - if frame.size != desired_size: - image = Image.frombytes(frame.format, frame.size, frame.image) - resized_image = image.resize(desired_size) - # logger.warning(f"{frame} does not have the expected size {desired_size}, resizing") - frame = OutputImageRawFrame( - resized_image.tobytes(), resized_image.size, resized_image.format - ) - - await self.write_raw_video_frame(frame) - - async def _set_video_image(self, image: OutputImageRawFrame): - self._video_images = itertools.cycle([image]) - - async def _set_video_images(self, images: List[OutputImageRawFrame]): - self._video_images = itertools.cycle(images) - - async def _video_out_task_handler(self): - self._video_out_start_time = None - self._video_out_frame_index = 0 - self._video_out_frame_duration = 1 / self._params.video_out_framerate - self._video_out_frame_reset = self._video_out_frame_duration * 5 - while True: - if self._params.video_out_is_live: - await self._video_out_is_live_handler() - elif self._video_images: - image = next(self._video_images) - await self._draw_image(image) - await asyncio.sleep(self._video_out_frame_duration) - else: - await asyncio.sleep(self._video_out_frame_duration) - - async def _video_out_is_live_handler(self): - image = await self._video_out_queue.get() - - # We get the start time as soon as we get the first image. - if not self._video_out_start_time: - self._video_out_start_time = time.time() - self._video_out_frame_index = 0 - - # Calculate how much time we need to wait before rendering next image. - real_elapsed_time = time.time() - self._video_out_start_time - real_render_time = self._video_out_frame_index * self._video_out_frame_duration - delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time - - if abs(delay_time) > self._video_out_frame_reset: - self._video_out_start_time = time.time() - self._video_out_frame_index = 0 - elif delay_time > 0: - await asyncio.sleep(delay_time) - self._video_out_frame_index += 1 - - # Render image - await self._draw_image(image) - - self._video_out_queue.task_done() + self._clock_queue.task_done() diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py index b3d537fa4..91270c365 100644 --- a/src/pipecat/transports/base_transport.py +++ b/src/pipecat/transports/base_transport.py @@ -5,7 +5,7 @@ # from abc import abstractmethod -from typing import Optional +from typing import List, Mapping, Optional from pydantic import BaseModel, ConfigDict @@ -33,7 +33,8 @@ class TransportParams(BaseModel): audio_out_channels: int = 1 audio_out_bitrate: int = 96000 audio_out_10ms_chunks: int = 4 - audio_out_mixer: Optional[BaseAudioMixer] = None + audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None + audio_out_destinations: List[str] = [] audio_in_enabled: bool = False audio_in_sample_rate: Optional[int] = None audio_in_channels: int = 1 @@ -48,6 +49,7 @@ class TransportParams(BaseModel): video_out_bitrate: int = 800000 video_out_framerate: int = 30 video_out_color_format: str = "RGB" + video_out_destinations: List[str] = [] vad_enabled: bool = False vad_audio_passthrough: bool = False vad_analyzer: Optional[VADAnalyzer] = None diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index bc8d2dd16..8bfd7ee34 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport): self._out_stream.close() self._out_stream = None - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): if self._out_stream: await self.get_event_loop().run_in_executor( self._executor, self._out_stream.write, frames diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py index 1695d2cab..bed6371c2 100644 --- a/src/pipecat/transports/local/tk.py +++ b/src/pipecat/transports/local/tk.py @@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport): self._out_stream.close() self._out_stream = None - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): if self._out_stream: await self.get_event_loop().run_in_executor( self._executor, self._out_stream.write, frames ) - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): self.get_event_loop().call_soon(self._write_frame_to_tk, frame) def _write_frame_to_tk(self, frame: OutputImageRawFrame): diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py index 047ae9c69..4a20bc49b 100644 --- a/src/pipecat/transports/network/fastapi_websocket.py +++ b/src/pipecat/transports/network/fastapi_websocket.py @@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport): await super().start(frame) await self._client.setup(frame) await self._params.serializer.setup(frame) - self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2 + self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 async def stop(self, frame: EndFrame): await super().stop(frame) @@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._write_frame(frame) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): if self._client.is_closing: return diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py index 6304d8d59..fdd501299 100644 --- a/src/pipecat/transports/network/small_webrtc.py +++ b/src/pipecat/transports/network/small_webrtc.py @@ -284,11 +284,13 @@ class SmallWebRTCClient: ) yield audio_frame - async def write_raw_audio_frames(self, data: bytes): + async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None): if self._can_send() and self._audio_output_track: await self._audio_output_track.add_audio_bytes(data) - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): if self._can_send() and self._video_output_track: self._video_output_track.add_video_frame(frame) @@ -497,10 +499,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._client.send_message(frame) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): await self._client.write_raw_audio_frames(frames) - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): await self._client.write_raw_video_frame(frame) diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py index e45a525fd..7e9725a76 100644 --- a/src/pipecat/transports/network/websocket_client.py +++ b/src/pipecat/transports/network/websocket_client.py @@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport): async def start(self, frame: StartFrame): await super().start(frame) - self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2 + self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 await self._params.serializer.setup(frame) await self._session.setup(frame) await self._session.connect() @@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._write_frame(frame) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): frame = OutputAudioRawFrame( audio=frames, sample_rate=self.sample_rate, diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 7ca395922..b930f9fd6 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport): async def start(self, frame: StartFrame): await super().start(frame) await self._params.serializer.setup(frame) - self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2 + self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 async def stop(self, frame: EndFrame): await super().stop(frame) @@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._write_frame(frame) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): if not self._websocket: # Simulate audio playback with a sleep. await self._write_audio_sleep() diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index c388b7093..d8fff4020 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -369,7 +369,7 @@ class DailyTransportClient(EventHandler): await asyncio.sleep(0.01) return None - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): if not self._mic: return None @@ -377,7 +377,9 @@ class DailyTransportClient(EventHandler): self._mic.write_frames(frames, completion=completion_callback(future)) await future - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): if not self._camera: return None @@ -1076,10 +1078,12 @@ class DailyOutputTransport(BaseOutputTransport): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): await self._client.send_message(frame) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): await self._client.write_raw_audio_frames(frames) - async def write_raw_video_frame(self, frame: OutputImageRawFrame): + async def write_raw_video_frame( + self, frame: OutputImageRawFrame, destination: Optional[str] = None + ): await self._client.write_raw_video_frame(frame) diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 2e56ebddf..456a70ea6 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport): else: await self._client.send_data(frame.message.encode()) - async def write_raw_audio_frames(self, frames: bytes): + async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None): livekit_audio = self._convert_pipecat_audio_to_livekit(frames) await self._client.publish_audio(livekit_audio)