Automated tests to validate the silence injection guards.
This commit is contained in:
@@ -8,6 +8,7 @@ import asyncio
|
||||
import struct
|
||||
import unittest
|
||||
|
||||
from pipecat.clocks.system_clock import SystemClock
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
@@ -18,6 +19,8 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
|
||||
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
||||
|
||||
|
||||
class _PassthroughResampler:
|
||||
@@ -27,13 +30,48 @@ class _PassthroughResampler:
|
||||
return audio
|
||||
|
||||
|
||||
async def _make_processor(*, buffer_size: int = 0) -> AudioBufferProcessor:
|
||||
"""Create and start a processor ready to record.
|
||||
|
||||
Calls setup() and sends a StartFrame through the public process_frame path so that
|
||||
the processor is fully initialised (task manager set, sample rate configured,
|
||||
__started flag set) without needing a full pipeline.
|
||||
"""
|
||||
processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=buffer_size)
|
||||
processor._input_resampler = _PassthroughResampler()
|
||||
processor._output_resampler = _PassthroughResampler()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
task_manager = TaskManager()
|
||||
task_manager.setup(TaskManagerParams(loop=loop))
|
||||
await processor.setup(FrameProcessorSetup(clock=SystemClock(), task_manager=task_manager))
|
||||
|
||||
await processor.process_frame(
|
||||
StartFrame(audio_out_sample_rate=16000), FrameDirection.DOWNSTREAM
|
||||
)
|
||||
await processor.start_recording()
|
||||
return processor
|
||||
|
||||
|
||||
async def _capture_track_audio(processor: AudioBufferProcessor) -> tuple[bytes, bytes]:
|
||||
"""Flush the processor and return (user_track, bot_track) from on_track_audio_data."""
|
||||
captured = {}
|
||||
event = asyncio.Event()
|
||||
|
||||
async def on_track_audio_data(_, user, bot, sample_rate, num_channels):
|
||||
captured["user"] = user
|
||||
captured["bot"] = bot
|
||||
event.set()
|
||||
|
||||
processor.add_event_handler("on_track_audio_data", on_track_audio_data)
|
||||
await processor.stop_recording()
|
||||
await asyncio.wait_for(event.wait(), timeout=1)
|
||||
return captured["user"], captured["bot"]
|
||||
|
||||
|
||||
class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
async def asyncSetUp(self):
|
||||
self.processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=4)
|
||||
self.processor._input_resampler = _PassthroughResampler()
|
||||
self.processor._output_resampler = _PassthroughResampler()
|
||||
self.processor._update_sample_rate(StartFrame(audio_out_sample_rate=16000))
|
||||
await self.processor.start_recording()
|
||||
self.processor = await _make_processor(buffer_size=4)
|
||||
|
||||
async def asyncTearDown(self):
|
||||
if getattr(self.processor, "_recording", False):
|
||||
@@ -60,7 +98,7 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
|
||||
|
||||
frame = InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1)
|
||||
await self.processor._process_recording(frame)
|
||||
await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
await asyncio.wait_for(audio_event.wait(), timeout=1)
|
||||
await asyncio.wait_for(track_event.wait(), timeout=1)
|
||||
@@ -102,7 +140,7 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
self.processor.add_event_handler("on_track_audio_data", on_track_audio_data)
|
||||
|
||||
frame = OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1)
|
||||
await self.processor._process_recording(frame)
|
||||
await self.processor.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
await asyncio.wait_for(audio_event.wait(), timeout=1)
|
||||
await asyncio.wait_for(track_event.wait(), timeout=1)
|
||||
@@ -126,113 +164,165 @@ class TestAudioBufferProcessor(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
|
||||
class TestSilenceInjectionGuards(unittest.IsolatedAsyncioTestCase):
|
||||
"""Tests that silence is not injected mid-utterance (fix for crackling artifacts)."""
|
||||
"""Tests that silence is not injected mid-utterance (fix for crackling artifacts).
|
||||
|
||||
async def _make_processor(self) -> AudioBufferProcessor:
|
||||
"""Return a processor with no auto-flush and a passthrough resampler."""
|
||||
processor = AudioBufferProcessor(sample_rate=16000, num_channels=2, buffer_size=0)
|
||||
processor._input_resampler = _PassthroughResampler()
|
||||
processor._output_resampler = _PassthroughResampler()
|
||||
processor._update_sample_rate(StartFrame(audio_out_sample_rate=16000))
|
||||
await processor.start_recording()
|
||||
return processor
|
||||
|
||||
async def asyncTearDown(self):
|
||||
# Processors created inside each test are cleaned up there; nothing shared.
|
||||
pass
|
||||
Each test verifies the audio alignment in the flushed tracks to confirm that
|
||||
silence is only added by _align_track_buffers at flush time (end of the buffer),
|
||||
never injected mid-stream while the affected track is actively producing audio.
|
||||
"""
|
||||
|
||||
async def test_no_silence_injected_into_bot_buffer_while_bot_speaking(self):
|
||||
"""Bot buffer must not receive silence padding while the bot is actively speaking."""
|
||||
p = await self._make_processor()
|
||||
"""Bot audio must appear at the start of the bot track, not after mid-stream silence.
|
||||
|
||||
# Give user buffer a head-start so a sync *would* pad the bot buffer normally.
|
||||
p._user_audio_buffer = bytearray(b"\x01\x02\x03\x04")
|
||||
Timeline:
|
||||
1. User sends 4 bytes (bot not speaking → normal sync, no-op since bot is at 0)
|
||||
2. Bot starts speaking
|
||||
3. User sends 4 more bytes (bot speaking → sync skipped; bot stays at 0)
|
||||
4. Bot sends 4 bytes of known audio
|
||||
|
||||
# Bot starts speaking.
|
||||
await p._process_recording(BotStartedSpeakingFrame())
|
||||
Expected final bot track (8 bytes total after _align_track_buffers at flush):
|
||||
[bot_audio][silence_padding] ← audio first, silence only at the end
|
||||
|
||||
# User audio arrives — without the fix the bot buffer would be padded to 4 bytes.
|
||||
user_audio = b"\x05\x06\x07\x08"
|
||||
await p._process_recording(
|
||||
InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1)
|
||||
With the bug the bot track would be:
|
||||
[silence_injected_mid_stream][bot_audio] ← silence inserted before the audio
|
||||
"""
|
||||
p = await _make_processor()
|
||||
|
||||
bot_audio = b"\xaa\xbb\xcc\xdd"
|
||||
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
|
||||
self.assertEqual(len(p._bot_audio_buffer), 0, "Bot buffer must not be padded while bot is speaking")
|
||||
self.assertEqual(bytes(p._user_audio_buffer), b"\x01\x02\x03\x04" + user_audio)
|
||||
|
||||
await p.stop_recording()
|
||||
_, bot_track = await _capture_track_audio(p)
|
||||
await p.cleanup()
|
||||
|
||||
# Audio must appear at the beginning of the bot track (not after injected silence).
|
||||
self.assertEqual(bot_track[:4], bot_audio)
|
||||
self.assertEqual(bot_track[4:], b"\x00" * 4)
|
||||
|
||||
async def test_no_silence_injected_into_user_buffer_while_user_speaking(self):
|
||||
"""User buffer must not receive silence padding while the user is actively speaking."""
|
||||
p = await self._make_processor()
|
||||
"""User audio must appear at the start of the user track, not after mid-stream silence.
|
||||
|
||||
# Give bot buffer a head-start so a sync *would* pad the user buffer normally.
|
||||
p._bot_audio_buffer = bytearray(b"\x01\x02\x03\x04")
|
||||
Timeline:
|
||||
1. Bot sends 4 bytes (user not speaking → normal sync, no-op since user is at 0)
|
||||
2. User starts speaking
|
||||
3. Bot sends 4 more bytes (user speaking → sync skipped; user stays at 0)
|
||||
4. User sends 4 bytes of known audio
|
||||
|
||||
# User starts speaking.
|
||||
await p._process_recording(UserStartedSpeakingFrame())
|
||||
Expected final user track (8 bytes total after _align_track_buffers at flush):
|
||||
[user_audio][silence_padding] ← audio first, silence only at the end
|
||||
|
||||
# Bot audio arrives — without the fix the user buffer would be padded to 4 bytes.
|
||||
bot_audio = b"\x05\x06\x07\x08"
|
||||
await p._process_recording(
|
||||
OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1)
|
||||
With the bug the user track would be:
|
||||
[silence_injected_mid_stream][user_audio]
|
||||
"""
|
||||
p = await _make_processor()
|
||||
|
||||
user_audio = b"\xaa\xbb\xcc\xdd"
|
||||
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
|
||||
self.assertEqual(len(p._user_audio_buffer), 0, "User buffer must not be padded while user is speaking")
|
||||
self.assertEqual(bytes(p._bot_audio_buffer), b"\x01\x02\x03\x04" + bot_audio)
|
||||
|
||||
await p.stop_recording()
|
||||
user_track, _ = await _capture_track_audio(p)
|
||||
await p.cleanup()
|
||||
|
||||
self.assertEqual(user_track[:4], user_audio)
|
||||
self.assertEqual(user_track[4:], b"\x00" * 4)
|
||||
|
||||
async def test_silence_resumes_into_bot_buffer_after_bot_stops_speaking(self):
|
||||
"""After bot stops speaking, silence injection into the bot buffer resumes."""
|
||||
p = await self._make_processor()
|
||||
"""After bot stops speaking, the bot buffer is synced again on user audio arrival.
|
||||
|
||||
# Give user buffer a head-start.
|
||||
p._user_audio_buffer = bytearray(b"\x01\x02\x03\x04")
|
||||
Timeline:
|
||||
1. User sends 4 bytes (user=4, bot=0)
|
||||
2. Bot starts speaking
|
||||
3. User sends 4 more bytes (sync skipped; user=8, bot=0)
|
||||
4. Bot stops speaking
|
||||
5. User sends 4 more bytes (sync resumes; bot gets 8 bytes silence, user=12)
|
||||
|
||||
# Bot speaks and stops.
|
||||
await p._process_recording(BotStartedSpeakingFrame())
|
||||
await p._process_recording(BotStoppedSpeakingFrame())
|
||||
Expected final bot track (12 bytes): 8 bytes silence then no more audio (bot never
|
||||
sent audio, _align_track_buffers pads bot to 12).
|
||||
The key assertion: bot has 8 bytes of silence at positions 0-7, confirming that
|
||||
the sync at step 5 did inject 8 bytes (positions 0-7 of the bot buffer).
|
||||
"""
|
||||
p = await _make_processor()
|
||||
|
||||
# User audio arrives now — bot is no longer speaking so sync should run.
|
||||
user_audio = b"\x05\x06\x07\x08"
|
||||
await p._process_recording(
|
||||
InputAudioRawFrame(audio=user_audio, sample_rate=16000, num_channels=1)
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(BotStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(BotStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
InputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
|
||||
# Bot buffer should have been padded with silence to match the 4-byte head-start.
|
||||
self.assertEqual(len(p._bot_audio_buffer), 4, "Bot buffer should be padded after bot stops speaking")
|
||||
self.assertEqual(bytes(p._bot_audio_buffer), b"\x00\x00\x00\x00")
|
||||
|
||||
await p.stop_recording()
|
||||
_, bot_track = await _capture_track_audio(p)
|
||||
await p.cleanup()
|
||||
|
||||
# The sync at step 5 targets len(user)=8, so bot must have 8 bytes of silence
|
||||
# written before user's third chunk was added.
|
||||
self.assertEqual(bot_track[:8], b"\x00" * 8)
|
||||
|
||||
async def test_silence_resumes_into_user_buffer_after_user_stops_speaking(self):
|
||||
"""After user stops speaking, silence injection into the user buffer resumes."""
|
||||
p = await self._make_processor()
|
||||
"""After user stops speaking, the user buffer is synced again on bot audio arrival.
|
||||
|
||||
# Give bot buffer a head-start.
|
||||
p._bot_audio_buffer = bytearray(b"\x01\x02\x03\x04")
|
||||
Timeline:
|
||||
1. Bot sends 4 bytes (user=0, bot=4)
|
||||
2. User starts speaking
|
||||
3. Bot sends 4 more bytes (sync skipped; user=0, bot=8)
|
||||
4. User stops speaking
|
||||
5. Bot sends 4 more bytes (sync resumes; user gets 8 bytes silence, bot=12)
|
||||
|
||||
# User speaks and stops.
|
||||
await p._process_recording(UserStartedSpeakingFrame())
|
||||
await p._process_recording(UserStoppedSpeakingFrame())
|
||||
Expected: user track has 8 bytes of silence at positions 0-7.
|
||||
"""
|
||||
p = await _make_processor()
|
||||
|
||||
# Bot audio arrives now — user is no longer speaking so sync should run.
|
||||
bot_audio = b"\x05\x06\x07\x08"
|
||||
await p._process_recording(
|
||||
OutputAudioRawFrame(audio=bot_audio, sample_rate=16000, num_channels=1)
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=b"\x01\x02\x03\x04", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=b"\x05\x06\x07\x08", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
await p.process_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
|
||||
await p.process_frame(
|
||||
OutputAudioRawFrame(audio=b"\x09\x0a\x0b\x0c", sample_rate=16000, num_channels=1),
|
||||
FrameDirection.DOWNSTREAM,
|
||||
)
|
||||
|
||||
# User buffer should have been padded with silence to match the 4-byte head-start.
|
||||
self.assertEqual(len(p._user_audio_buffer), 4, "User buffer should be padded after user stops speaking")
|
||||
self.assertEqual(bytes(p._user_audio_buffer), b"\x00\x00\x00\x00")
|
||||
|
||||
await p.stop_recording()
|
||||
user_track, _ = await _capture_track_audio(p)
|
||||
await p.cleanup()
|
||||
|
||||
self.assertEqual(user_track[:8], b"\x00" * 8)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user