Merge pull request #846 from pipecat-ai/aleix/base-output-transport-audio-sync
transport(output): fix non-audio frames sync after audio frames
This commit is contained in:
@@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- `AWSTTSService` is now deprecated, use `PollyTTSService` instead.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `BaseOutputTransport` issue that was causing non-audio frames being
|
||||
processed before the previous audio frames were played. This will allow, for
|
||||
example, sending a frame `A` after a `TTSSpeakFrame` and the frame `A` will
|
||||
only be pushed downstream after the audio generated from `TTSSpeakFrame` has
|
||||
been spoken.
|
||||
|
||||
## [0.0.50] - 2024-12-11
|
||||
|
||||
### Added
|
||||
|
||||
@@ -15,7 +15,6 @@ from PIL import Image
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VAD_STOP_SECS
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
BotSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
@@ -52,9 +51,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._sink_clock_task = None
|
||||
|
||||
# Task to write/send audio and image frames.
|
||||
self._audio_out_task = None
|
||||
self._camera_out_task = None
|
||||
self._running_out_tasks = True
|
||||
|
||||
# These are the images that we should send to the camera at our desired
|
||||
# framerate.
|
||||
@@ -77,22 +74,31 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Start audio mixer.
|
||||
if self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.start(self._params.audio_out_sample_rate)
|
||||
self._create_output_tasks()
|
||||
self._create_camera_task()
|
||||
self._create_sink_tasks()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# We can't cancel output tasks because there might still be audio
|
||||
# buffered to be played.
|
||||
await self._stop_output_tasks()
|
||||
# Stop audio mixer.
|
||||
if self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.stop()
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the sink tasks. We also need to wait
|
||||
# for these tasks before cancelling the camera and audio tasks below
|
||||
# because they might be still rendering.
|
||||
if self._sink_task:
|
||||
await self._sink_task
|
||||
if self._sink_clock_task:
|
||||
await self._sink_clock_task
|
||||
|
||||
# We can now cancel the camera task.
|
||||
await self._cancel_camera_task()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter if we cancel sink
|
||||
# tasks first or not.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_output_tasks()
|
||||
await self._cancel_camera_task()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
pass
|
||||
@@ -103,6 +109,12 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
pass
|
||||
|
||||
async def send_audio(self, frame: OutputAudioRawFrame):
|
||||
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def send_image(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
#
|
||||
# Frame processor
|
||||
#
|
||||
@@ -130,11 +142,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames.
|
||||
elif isinstance(frame, EndFrame):
|
||||
# Process sink tasks.
|
||||
await self._stop_sink_tasks(frame)
|
||||
# Now we can stop.
|
||||
await self.stop(frame)
|
||||
# We finally push EndFrame down so PipelineTask stops nicely.
|
||||
# Keep pushing EndFrame down so all the pipeline stops nicely.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.process_frame(frame)
|
||||
@@ -149,30 +158,16 @@ class BaseOutputTransport(FrameProcessor):
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
async def _stop_sink_tasks(self, frame: EndFrame):
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the sink tasks. We also need to wait
|
||||
# for these tasks before cancelling the camera and audio tasks below
|
||||
# because they might be still rendering.
|
||||
if self._sink_task:
|
||||
await self._sink_task
|
||||
if self._sink_clock_task:
|
||||
await self._sink_clock_task
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
if not self.interruptions_allowed:
|
||||
return
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Cancel sink and output tasks.
|
||||
# Cancel sink and camera tasks.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_output_tasks()
|
||||
# Create sink and output tasks.
|
||||
self._create_output_tasks()
|
||||
await self._cancel_camera_task()
|
||||
# Create sink and camera tasks.
|
||||
self._create_camera_task()
|
||||
self._create_sink_tasks()
|
||||
# Let's send a bot stopped speaking if we have to.
|
||||
await self._bot_stopped_speaking()
|
||||
@@ -181,19 +176,16 @@ class BaseOutputTransport(FrameProcessor):
|
||||
if not self._params.audio_out_enabled:
|
||||
return
|
||||
|
||||
if self._params.audio_out_is_live:
|
||||
await self._audio_out_queue.put(frame)
|
||||
else:
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(frame.audio)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
await self._sink_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(frame.audio)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
await self._sink_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
if not self._params.camera_out_enabled:
|
||||
@@ -242,9 +234,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._sink_clock_task = None
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self._audio_out_queue.put(frame)
|
||||
elif isinstance(frame, OutputImageRawFrame):
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
@@ -254,19 +244,6 @@ class BaseOutputTransport(FrameProcessor):
|
||||
elif not isinstance(frame, EndFrame):
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._sink_queue.get()
|
||||
await self._sink_frame_handler(frame)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
self._sink_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink queue: {e}")
|
||||
|
||||
async def _sink_clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
@@ -292,48 +269,93 @@ class BaseOutputTransport(FrameProcessor):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
def _next_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
while True:
|
||||
try:
|
||||
frame = self._sink_queue.get_nowait()
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._params.audio_out_mixer.mix(silence),
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
vad_stop_secs = (
|
||||
self._params.vad_analyzer.params.stop_secs
|
||||
if self._params.vad_analyzer
|
||||
else VAD_STOP_SECS
|
||||
)
|
||||
if self._params.audio_out_mixer:
|
||||
return with_mixer(vad_stop_secs)
|
||||
else:
|
||||
return without_mixer(vad_stop_secs)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
try:
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error writing to microphone: {e}")
|
||||
|
||||
#
|
||||
# Output tasks
|
||||
# Camera task
|
||||
#
|
||||
|
||||
def _create_output_tasks(self):
|
||||
def _create_camera_task(self):
|
||||
loop = self.get_event_loop()
|
||||
# Create camera output queue and task if needed.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
|
||||
# Create audio output queue and task if needed.
|
||||
if self._params.audio_out_enabled:
|
||||
self._audio_out_queue = asyncio.Queue()
|
||||
self._audio_out_task = loop.create_task(self._audio_out_task_handler())
|
||||
|
||||
async def _stop_output_tasks(self):
|
||||
self._running_out_tasks = False
|
||||
# Stop camera output task.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
await self._camera_out_task
|
||||
# Stop audio output task.
|
||||
if self._audio_out_task and self._params.audio_out_enabled:
|
||||
await self._audio_out_task
|
||||
|
||||
async def _cancel_output_tasks(self):
|
||||
async def _cancel_camera_task(self):
|
||||
# Stop camera output task.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
self._camera_out_task = None
|
||||
# Stop audio output task.
|
||||
if self._audio_out_task and self._params.audio_out_enabled:
|
||||
self._audio_out_task.cancel()
|
||||
await self._audio_out_task
|
||||
self._audio_out_task = None
|
||||
|
||||
#
|
||||
# Camera out
|
||||
#
|
||||
|
||||
async def send_image(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
|
||||
@@ -359,7 +381,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._camera_out_frame_index = 0
|
||||
self._camera_out_frame_duration = 1 / self._params.camera_out_framerate
|
||||
self._camera_out_frame_reset = self._camera_out_frame_duration * 5
|
||||
while self._running_out_tasks:
|
||||
while True:
|
||||
try:
|
||||
if self._params.camera_out_is_live:
|
||||
await self._camera_out_is_live_handler()
|
||||
@@ -398,74 +420,3 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._draw_image(image)
|
||||
|
||||
self._camera_out_queue.task_done()
|
||||
|
||||
#
|
||||
# Audio out
|
||||
#
|
||||
|
||||
async def send_audio(self, frame: OutputAudioRawFrame):
|
||||
await self.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
def _next_audio_frame(self) -> AsyncGenerator[AudioRawFrame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]:
|
||||
while self._running_out_tasks or self._bot_speaking:
|
||||
try:
|
||||
frame = await asyncio.wait_for(
|
||||
self._audio_out_queue.get(), timeout=vad_stop_secs
|
||||
)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[AudioRawFrame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
while self._running_out_tasks or self._bot_speaking:
|
||||
try:
|
||||
frame = self._audio_out_queue.get_nowait()
|
||||
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._params.audio_out_mixer.mix(silence),
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
vad_stop_secs = (
|
||||
self._params.vad_analyzer.params.stop_secs
|
||||
if self._params.vad_analyzer
|
||||
else VAD_STOP_SECS
|
||||
)
|
||||
if self._params.audio_out_mixer:
|
||||
return with_mixer(vad_stop_secs)
|
||||
else:
|
||||
return without_mixer(vad_stop_secs)
|
||||
|
||||
async def _audio_out_task_handler(self):
|
||||
try:
|
||||
async for frame in self._next_audio_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error writing to microphone: {e}")
|
||||
|
||||
Reference in New Issue
Block a user