From 413dbaf974b605cf909a05ec7149c67957ea148a Mon Sep 17 00:00:00 2001 From: filipi87 Date: Wed, 25 Mar 2026 16:05:58 -0300 Subject: [PATCH] Automated tests to validate the silence injection guards. --- tests/test_audio_buffer_processor.py | 250 ++++++++++++++++++--------- 1 file changed, 170 insertions(+), 80 deletions(-) diff --git a/tests/test_audio_buffer_processor.py b/tests/test_audio_buffer_processor.py index a163a94ef..27403e6fc 100644 --- a/tests/test_audio_buffer_processor.py +++ b/tests/test_audio_buffer_processor.py @@ -8,6 +8,7 @@ import asyncio import struct import unittest +from pipecat.clocks.system_clock import SystemClock from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, @@ -18,6 +19,8 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor +from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup +from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams class _PassthroughResampler: @@ -27,13 +30,48 @@ class _PassthroughResampler: return audio +async def _make_processor(*, buffer_size: int = 0) -> AudioBufferProcessor: + """Create and start a processor ready to record. + + Calls setup() and sends a StartFrame through the public process_frame path so that + the processor is fully initialised (task manager set, sample rate configured, + __started flag set) without needing a full pipeline. + """ + processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=buffer_size) + processor._input_resampler = _PassthroughResampler() + processor._output_resampler = _PassthroughResampler() + + loop = asyncio.get_event_loop() + task_manager = TaskManager() + task_manager.setup(TaskManagerParams(loop=loop)) + await processor.setup(FrameProcessorSetup(clock=SystemClock(), task_manager=task_manager)) + + await processor.process_frame( + StartFrame(audio_out_sample_rate=16000), FrameDirection.DOWNSTREAM + ) + await processor.start_recording() + return processor + + +async def _capture_track_audio(processor: AudioBufferProcessor) -> tuple[bytes, bytes]: + """Flush the processor and return (user_track, bot_track) from on_track_audio_data.""" + captured = {} + event = asyncio.Event() + + async def on_track_audio_data(_, user, bot, sample_rate, num_channels): + captured["user"] = user + captured["bot"] = bot + event.set() + + processor.add_event_handler("on_track_audio_data", on_track_audio_data) + await processor.stop_recording() + await asyncio.wait_for(event.wait(), timeout=1) + return captured["user"], captured["bot"] + + class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): - self.processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=4) - self.processor._input_resampler = _PassthroughResampler() - self.processor._output_resampler = _PassthroughResampler() - self.processor._update_sample_rate(StartFrame(audio_out_sample_rate=16000)) - await self.processor.start_recording() + self.processor = await _make_processor(buffer_size=4) async def asyncTearDown(self): if getattr(self.processor, "_recording", False): @@ -60,7 +98,7 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase): self.processor.add_event_handler("on_track_audio_data", on_track_audio_data) frame = InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1) - await self.processor._process_recording(frame) + await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM) await asyncio.wait_for(audio_event.wait(), timeout=1) await asyncio.wait_for(track_event.wait(), timeout=1) @@ -102,7 +140,7 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase): self.processor.add_event_handler("on_track_audio_data", on_track_audio_data) frame = OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1) - await self.processor._process_recording(frame) + await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM) await asyncio.wait_for(audio_event.wait(), timeout=1) await asyncio.wait_for(track_event.wait(), timeout=1) @@ -126,113 +164,165 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase): class TestSilenceInjectionGuards(unittest.IsolatedAsyncioTestCase): - """Tests that silence is not injected mid-utterance (fix for crackling artifacts).""" + """Tests that silence is not injected mid-utterance (fix for crackling artifacts). - async def _make_processor(self) -> AudioBufferProcessor: - """Return a processor with no auto-flush and a passthrough resampler.""" - processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=0) - processor._input_resampler = _PassthroughResampler() - processor._output_resampler = _PassthroughResampler() - processor._update_sample_rate(StartFrame(audio_out_sample_rate=16000)) - await processor.start_recording() - return processor - - async def asyncTearDown(self): - # Processors created inside each test are cleaned up there; nothing shared. - pass + Each test verifies the audio alignment in the flushed tracks to confirm that + silence is only added by _align_track_buffers at flush time (end of the buffer), + never injected mid-stream while the affected track is actively producing audio. + """ async def test_no_silence_injected_into_bot_buffer_while_bot_speaking(self): - """Bot buffer must not receive silence padding while the bot is actively speaking.""" - p = await self._make_processor() + """Bot audio must appear at the start of the bot track, not after mid-stream silence. - # Give user buffer a head-start so a sync *would* pad the bot buffer normally. - p._user_audio_buffer = bytearray(b"\x01\x02\x03\x04") + Timeline: + 1. User sends 4 bytes (bot not speaking → normal sync, no-op since bot is at 0) + 2. Bot starts speaking + 3. User sends 4 more bytes (bot speaking → sync skipped; bot stays at 0) + 4. Bot sends 4 bytes of known audio - # Bot starts speaking. - await p._process_recording(BotStartedSpeakingFrame()) + Expected final bot track (8 bytes total after _align_track_buffers at flush): + [bot_audio][silence_padding] ← audio first, silence only at the end - # User audio arrives — without the fix the bot buffer would be padded to 4 bytes. - user_audio = b"\x05\x06\x07\x08" - await p._process_recording( - InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1) + With the bug the bot track would be: + [silence_injected_mid_stream][bot_audio] ← silence inserted before the audio + """ + p = await _make_processor() + + bot_audio = b"\xaa\xbb\xcc\xdd" + + await p.process_frame( + InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame( + OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, ) - self.assertEqual(len(p._bot_audio_buffer), 0, "Bot buffer must not be padded while bot is speaking") - self.assertEqual(bytes(p._user_audio_buffer), b"\x01\x02\x03\x04" + user_audio) - - await p.stop_recording() + _, bot_track = await _capture_track_audio(p) await p.cleanup() + # Audio must appear at the beginning of the bot track (not after injected silence). + self.assertEqual(bot_track[:4], bot_audio) + self.assertEqual(bot_track[4:], b"\x00" * 4) + async def test_no_silence_injected_into_user_buffer_while_user_speaking(self): - """User buffer must not receive silence padding while the user is actively speaking.""" - p = await self._make_processor() + """User audio must appear at the start of the user track, not after mid-stream silence. - # Give bot buffer a head-start so a sync *would* pad the user buffer normally. - p._bot_audio_buffer = bytearray(b"\x01\x02\x03\x04") + Timeline: + 1. Bot sends 4 bytes (user not speaking → normal sync, no-op since user is at 0) + 2. User starts speaking + 3. Bot sends 4 more bytes (user speaking → sync skipped; user stays at 0) + 4. User sends 4 bytes of known audio - # User starts speaking. - await p._process_recording(UserStartedSpeakingFrame()) + Expected final user track (8 bytes total after _align_track_buffers at flush): + [user_audio][silence_padding] ← audio first, silence only at the end - # Bot audio arrives — without the fix the user buffer would be padded to 4 bytes. - bot_audio = b"\x05\x06\x07\x08" - await p._process_recording( - OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1) + With the bug the user track would be: + [silence_injected_mid_stream][user_audio] + """ + p = await _make_processor() + + user_audio = b"\xaa\xbb\xcc\xdd" + + await p.process_frame( + OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame( + InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, ) - self.assertEqual(len(p._user_audio_buffer), 0, "User buffer must not be padded while user is speaking") - self.assertEqual(bytes(p._bot_audio_buffer), b"\x01\x02\x03\x04" + bot_audio) - - await p.stop_recording() + user_track, _ = await _capture_track_audio(p) await p.cleanup() + self.assertEqual(user_track[:4], user_audio) + self.assertEqual(user_track[4:], b"\x00" * 4) + async def test_silence_resumes_into_bot_buffer_after_bot_stops_speaking(self): - """After bot stops speaking, silence injection into the bot buffer resumes.""" - p = await self._make_processor() + """After bot stops speaking, the bot buffer is synced again on user audio arrival. - # Give user buffer a head-start. - p._user_audio_buffer = bytearray(b"\x01\x02\x03\x04") + Timeline: + 1. User sends 4 bytes (user=4, bot=0) + 2. Bot starts speaking + 3. User sends 4 more bytes (sync skipped; user=8, bot=0) + 4. Bot stops speaking + 5. User sends 4 more bytes (sync resumes; bot gets 8 bytes silence, user=12) - # Bot speaks and stops. - await p._process_recording(BotStartedSpeakingFrame()) - await p._process_recording(BotStoppedSpeakingFrame()) + Expected final bot track (12 bytes): 8 bytes silence then no more audio (bot never + sent audio, _align_track_buffers pads bot to 12). + The key assertion: bot has 8 bytes of silence at positions 0-7, confirming that + the sync at step 5 did inject 8 bytes (positions 0-7 of the bot buffer). + """ + p = await _make_processor() - # User audio arrives now — bot is no longer speaking so sync should run. - user_audio = b"\x05\x06\x07\x08" - await p._process_recording( - InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1) + await p.process_frame( + InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(BotStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + InputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, ) - # Bot buffer should have been padded with silence to match the 4-byte head-start. - self.assertEqual(len(p._bot_audio_buffer), 4, "Bot buffer should be padded after bot stops speaking") - self.assertEqual(bytes(p._bot_audio_buffer), b"\x00\x00\x00\x00") - - await p.stop_recording() + _, bot_track = await _capture_track_audio(p) await p.cleanup() + # The sync at step 5 targets len(user)=8, so bot must have 8 bytes of silence + # written before user's third chunk was added. + self.assertEqual(bot_track[:8], b"\x00" * 8) + async def test_silence_resumes_into_user_buffer_after_user_stops_speaking(self): - """After user stops speaking, silence injection into the user buffer resumes.""" - p = await self._make_processor() + """After user stops speaking, the user buffer is synced again on bot audio arrival. - # Give bot buffer a head-start. - p._bot_audio_buffer = bytearray(b"\x01\x02\x03\x04") + Timeline: + 1. Bot sends 4 bytes (user=0, bot=4) + 2. User starts speaking + 3. Bot sends 4 more bytes (sync skipped; user=0, bot=8) + 4. User stops speaking + 5. Bot sends 4 more bytes (sync resumes; user gets 8 bytes silence, bot=12) - # User speaks and stops. - await p._process_recording(UserStartedSpeakingFrame()) - await p._process_recording(UserStoppedSpeakingFrame()) + Expected: user track has 8 bytes of silence at positions 0-7. + """ + p = await _make_processor() - # Bot audio arrives now — user is no longer speaking so sync should run. - bot_audio = b"\x05\x06\x07\x08" - await p._process_recording( - OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1) + await p.process_frame( + OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, + ) + await p.process_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM) + await p.process_frame( + OutputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1), + FrameDirection.DOWNSTREAM, ) - # User buffer should have been padded with silence to match the 4-byte head-start. - self.assertEqual(len(p._user_audio_buffer), 4, "User buffer should be padded after user stops speaking") - self.assertEqual(bytes(p._user_audio_buffer), b"\x00\x00\x00\x00") - - await p.stop_recording() + user_track, _ = await _capture_track_audio(p) await p.cleanup() + self.assertEqual(user_track[:8], b"\x00" * 8) + if __name__ == "__main__": unittest.main()