Files
pipecat/tests/test_audio_buffer_processor.py

329 lines
13 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import struct
import unittest
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
class _PassthroughResampler:
async def resample(
self, audio: bytes, in_rate: int, out_rate: int
) -> bytes: # pragma: no cover - trivial
return audio
async def _make_processor(*, buffer_size: int = 0) -> AudioBufferProcessor:
"""Create and start a processor ready to record.
Calls setup() and sends a StartFrame through the public process_frame path so that
the processor is fully initialised (task manager set, sample rate configured,
__started flag set) without needing a full pipeline.
"""
processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=buffer_size)
processor._input_resampler = _PassthroughResampler()
processor._output_resampler = _PassthroughResampler()
loop = asyncio.get_event_loop()
task_manager = TaskManager()
task_manager.setup(TaskManagerParams(loop=loop))
await processor.setup(FrameProcessorSetup(clock=SystemClock(), task_manager=task_manager))
await processor.process_frame(
StartFrame(audio_out_sample_rate=16000), FrameDirection.DOWNSTREAM
)
await processor.start_recording()
return processor
async def _capture_track_audio(processor: AudioBufferProcessor) -> tuple[bytes, bytes]:
"""Flush the processor and return (user_track, bot_track) from on_track_audio_data."""
captured = {}
event = asyncio.Event()
async def on_track_audio_data(_, user, bot, sample_rate, num_channels):
captured["user"] = user
captured["bot"] = bot
event.set()
processor.add_event_handler("on_track_audio_data", on_track_audio_data)
await processor.stop_recording()
await asyncio.wait_for(event.wait(), timeout=1)
return captured["user"], captured["bot"]
class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.processor = await _make_processor(buffer_size=4)
async def asyncTearDown(self):
if getattr(self.processor, "_recording", False):
await self.processor.stop_recording()
await self.processor.cleanup()
async def test_flush_user_audio_pads_bot_track(self):
user_audio = struct.pack("<hh", 1000, -1000)
audio_event = asyncio.Event()
track_event = asyncio.Event()
captured = {}
async def on_audio_data(_, audio: bytes, sample_rate: int, num_channels: int):
captured["merged"] = (audio, sample_rate, num_channels)
audio_event.set()
async def on_track_audio_data(
_, user: bytes, bot: bytes, sample_rate: int, num_channels: int
):
captured["tracks"] = (user, bot, sample_rate, num_channels)
track_event.set()
self.processor.add_event_handler("on_audio_data", on_audio_data)
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
frame = InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1)
await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM)
await asyncio.wait_for(audio_event.wait(), timeout=1)
await asyncio.wait_for(track_event.wait(), timeout=1)
merged_audio, merged_sr, merged_channels = captured["merged"]
user_track, bot_track, track_sr, track_channels = captured["tracks"]
self.assertEqual(merged_sr, 16000)
self.assertEqual(merged_channels, 2)
self.assertEqual(track_sr, 16000)
self.assertEqual(track_channels, 2)
self.assertEqual(user_track, user_audio)
self.assertEqual(bot_track, b"\x00" * len(user_audio))
self.assertEqual(len(merged_audio), len(user_audio) * 2)
self.assertEqual(merged_audio[0:2], user_audio[0:2])
self.assertEqual(merged_audio[2:4], b"\x00\x00")
self.assertEqual(merged_audio[4:6], user_audio[2:4])
self.assertEqual(merged_audio[6:8], b"\x00\x00")
self.assertEqual(len(self.processor._user_audio_buffer), 0)
self.assertEqual(len(self.processor._bot_audio_buffer), 0)
async def test_flush_bot_audio_pads_user_track(self):
bot_audio = struct.pack("<hh", -800, 400)
audio_event = asyncio.Event()
track_event = asyncio.Event()
captured = {}
async def on_audio_data(_, audio: bytes, sample_rate: int, num_channels: int):
captured["merged"] = (audio, sample_rate, num_channels)
audio_event.set()
async def on_track_audio_data(
_, user: bytes, bot: bytes, sample_rate: int, num_channels: int
):
captured["tracks"] = (user, bot, sample_rate, num_channels)
track_event.set()
self.processor.add_event_handler("on_audio_data", on_audio_data)
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
frame = OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1)
await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM)
await asyncio.wait_for(audio_event.wait(), timeout=1)
await asyncio.wait_for(track_event.wait(), timeout=1)
merged_audio, merged_sr, merged_channels = captured["merged"]
user_track, bot_track, track_sr, track_channels = captured["tracks"]
self.assertEqual(merged_sr, 16000)
self.assertEqual(merged_channels, 2)
self.assertEqual(track_sr, 16000)
self.assertEqual(track_channels, 2)
self.assertEqual(user_track, b"\x00" * len(bot_audio))
self.assertEqual(bot_track, bot_audio)
self.assertEqual(len(merged_audio), len(bot_audio) * 2)
self.assertEqual(merged_audio[0:2], b"\x00\x00")
self.assertEqual(merged_audio[2:4], bot_audio[0:2])
self.assertEqual(merged_audio[4:6], b"\x00\x00")
self.assertEqual(merged_audio[6:8], bot_audio[2:4])
self.assertEqual(len(self.processor._user_audio_buffer), 0)
self.assertEqual(len(self.processor._bot_audio_buffer), 0)
class TestSilenceInjectionGuards(unittest.IsolatedAsyncioTestCase):
"""Tests that silence is not injected mid-utterance (fix for crackling artifacts).
Each test verifies the audio alignment in the flushed tracks to confirm that
silence is only added by _align_track_buffers at flush time (end of the buffer),
never injected mid-stream while the affected track is actively producing audio.
"""
async def test_no_silence_injected_into_bot_buffer_while_bot_speaking(self):
"""Bot audio must appear at the start of the bot track, not after mid-stream silence.
Timeline:
1. User sends 4 bytes (bot not speaking → normal sync, no-op since bot is at 0)
2. Bot starts speaking
3. User sends 4 more bytes (bot speaking → sync skipped; bot stays at 0)
4. Bot sends 4 bytes of known audio
Expected final bot track (8 bytes total after _align_track_buffers at flush):
[bot_audio][silence_padding] ← audio first, silence only at the end
With the bug the bot track would be:
[silence_injected_mid_stream][bot_audio] ← silence inserted before the audio
"""
p = await _make_processor()
bot_audio = b"\xaa\xbb\xcc\xdd"
await p.process_frame(
InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(
OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
_, bot_track = await _capture_track_audio(p)
await p.cleanup()
# Audio must appear at the beginning of the bot track (not after injected silence).
self.assertEqual(bot_track[:4], bot_audio)
self.assertEqual(bot_track[4:], b"\x00" * 4)
async def test_no_silence_injected_into_user_buffer_while_user_speaking(self):
"""User audio must appear at the start of the user track, not after mid-stream silence.
Timeline:
1. Bot sends 4 bytes (user not speaking → normal sync, no-op since user is at 0)
2. User starts speaking
3. Bot sends 4 more bytes (user speaking → sync skipped; user stays at 0)
4. User sends 4 bytes of known audio
Expected final user track (8 bytes total after _align_track_buffers at flush):
[user_audio][silence_padding] ← audio first, silence only at the end
With the bug the user track would be:
[silence_injected_mid_stream][user_audio]
"""
p = await _make_processor()
user_audio = b"\xaa\xbb\xcc\xdd"
await p.process_frame(
OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(
InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
user_track, _ = await _capture_track_audio(p)
await p.cleanup()
self.assertEqual(user_track[:4], user_audio)
self.assertEqual(user_track[4:], b"\x00" * 4)
async def test_silence_resumes_into_bot_buffer_after_bot_stops_speaking(self):
"""After bot stops speaking, the bot buffer is synced again on user audio arrival.
Timeline:
1. User sends 4 bytes (user=4, bot=0)
2. Bot starts speaking
3. User sends 4 more bytes (sync skipped; user=8, bot=0)
4. Bot stops speaking
5. User sends 4 more bytes (sync resumes; bot gets 8 bytes silence, user=12)
Expected final bot track (12 bytes): 8 bytes silence then no more audio (bot never
sent audio, _align_track_buffers pads bot to 12).
The key assertion: bot has 8 bytes of silence at positions 0-7, confirming that
the sync at step 5 did inject 8 bytes (positions 0-7 of the bot buffer).
"""
p = await _make_processor()
await p.process_frame(
InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(BotStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
InputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
_, bot_track = await _capture_track_audio(p)
await p.cleanup()
# The sync at step 5 targets len(user)=8, so bot must have 8 bytes of silence
# written before user's third chunk was added.
self.assertEqual(bot_track[:8], b"\x00" * 8)
async def test_silence_resumes_into_user_buffer_after_user_stops_speaking(self):
"""After user stops speaking, the user buffer is synced again on bot audio arrival.
Timeline:
1. Bot sends 4 bytes (user=0, bot=4)
2. User starts speaking
3. Bot sends 4 more bytes (sync skipped; user=0, bot=8)
4. User stops speaking
5. Bot sends 4 more bytes (sync resumes; user gets 8 bytes silence, bot=12)
Expected: user track has 8 bytes of silence at positions 0-7.
"""
p = await _make_processor()
await p.process_frame(
OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
await p.process_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await p.process_frame(
OutputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1),
FrameDirection.DOWNSTREAM,
)
user_track, _ = await _capture_track_audio(p)
await p.cleanup()
self.assertEqual(user_track[:8], b"\x00" * 8)
if __name__ == "__main__":
unittest.main()