Merge pull request #2091 from pipecat-ai/filipi/sample_rate

Creating a new stream resampler which avoids clicks.
This commit is contained in:
Filipi da Silva Fuchter
2025-07-02 16:22:46 -03:00
committed by GitHub
16 changed files with 201 additions and 43 deletions

View File

@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added a new `SOXRStreamAudioResampler` for processing audio in chunks or streams.
- Added new `DailyParams.audio_in_user_tracks` to allow receiving one track per
user (default) or a single track from the room (all participants mixed).
@@ -56,6 +58,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated all the services to use the new `SOXRStreamAudioResampler`, ensuring smooth
transitions and eliminating clicks.
- Upgraded `daily-python` to 0.19.4.
- Updated `google` optional dependency to use `google-genai` version `1.24.0`.

View File

@@ -2,4 +2,4 @@ ruff format src
ruff format examples
ruff format tests
ruff format scripts
ruff check --select I --fix
ruff check --select I,D --fix

View File

@@ -7,7 +7,12 @@
"""SoX-based audio resampler implementation.
This module provides an audio resampler that uses the SoX resampler library
for very high quality audio sample rate conversion.
for very high-quality audio sample rate conversion.
When to use the SOXRAudioResampler:
1. For batch processing of complete audio files
2. When you have all the audio data available at once
"""
import numpy as np

View File

@@ -0,0 +1,101 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""SoX-based audio resampler stream implementation.
This module provides an audio resampler that uses the SoX ResampleStream library
for very high quality audio sample rate conversion.
When to use the SOXRStreamAudioResampler:
1. For real-time processing scenarios
2. When dealing with very long audio signals
3. When processing audio in chunks or streams
4. When you need to reuse the same resampler configuration multiple times, as it saves initialization overhead
"""
import time
import numpy as np
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
CLEAR_STREAM_AFTER_SECS = 0.2
class SOXRStreamAudioResampler(BaseAudioResampler):
"""Audio resampler implementation using the SoX ResampleStream library.
This resampler uses the SoX ResampleStream library configured for very high
quality (VHQ) resampling, providing excellent audio quality at the cost
of additional computational overhead.
It keeps an internal history which avoids clicks at chunk boundaries.
Notes:
- Only supports mono audio (1 channel).
- Input must be 16-bit signed PCM audio as raw bytes.
"""
def __init__(self, **kwargs):
"""Initialize the resampler.
Args:
**kwargs: Additional keyword arguments (currently unused).
"""
self._in_rate: float | None = None
self._out_rate: float | None = None
self._last_resample_time: float = 0
self._soxr_stream: soxr.ResampleStream | None = None
def _initialize(self, in_rate: float, out_rate: float):
self._in_rate = in_rate
self._out_rate = out_rate
self._last_resample_time = time.time()
self._soxr_stream = soxr.ResampleStream(
in_rate=in_rate, out_rate=out_rate, num_channels=1, quality="VHQ", dtype="int16"
)
def _maybe_clear_internal_state(self):
current_time = time.time()
time_since_last_resample = current_time - self._last_resample_time
# If more than CLEAR_STREAM_AFTER_SECS seconds have passed, clear the resampler state
if time_since_last_resample > CLEAR_STREAM_AFTER_SECS:
if self._soxr_stream:
self._soxr_stream.clear()
self._last_resample_time = current_time
def _maybe_initialize_sox_stream(self, in_rate: int, out_rate: int):
if self._soxr_stream is None:
self._initialize(in_rate, out_rate)
else:
self._maybe_clear_internal_state()
if self._in_rate != in_rate or self._out_rate != out_rate:
raise ValueError(
f"SOXRStreamAudioResampler cannot be reused with different sample rates: "
f"expected {self._in_rate}->{self._out_rate}, got {in_rate}->{out_rate}"
)
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""Resample audio data using soxr.ResampleStream resampler library.
Args:
audio: Input audio data as raw bytes (16-bit signed integers).
in_rate: Original sample rate in Hz.
out_rate: Target sample rate in Hz.
Returns:
Resampled audio data as raw bytes (16-bit signed integers).
"""
if in_rate == out_rate:
return audio
self._maybe_initialize_sox_stream(in_rate, out_rate)
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = self._soxr_stream.resample_chunk(audio_data)
result = resampled_audio.astype(np.int16).tobytes()
return result

View File

@@ -15,15 +15,41 @@ import audioop
import numpy as np
import pyloudnorm as pyln
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler
def create_default_resampler(**kwargs) -> BaseAudioResampler:
"""Create a default audio resampler instance.
. deprecated:: 0.0.74
This function is deprecated and will be removed in a future version.
Use `create_stream_resampler` for real-time processing scenarios or
`create_file_resampler` for batch processing of complete audio files.
Args:
**kwargs: Additional keyword arguments passed to the resampler constructor.
Returns:
A configured SOXRAudioResampler instance.
"""
import warnings
warnings.warn(
"`create_default_resampler` is deprecated. "
"Use `create_stream_resampler` for real-time processing scenarios or "
"`create_file_resampler` for batch processing of complete audio files.",
DeprecationWarning,
stacklevel=2,
)
return SOXRAudioResampler(**kwargs)
def create_file_resampler(**kwargs) -> BaseAudioResampler:
"""Create an audio resampler instance for batch processing of complete audio files.
Args:
**kwargs: Additional keyword arguments passed to the resampler constructor.
@@ -33,6 +59,18 @@ def create_default_resampler(**kwargs) -> BaseAudioResampler:
return SOXRAudioResampler(**kwargs)
def create_stream_resampler(**kwargs) -> BaseAudioResampler:
"""Create a stream audio resampler instance.
Args:
**kwargs: Additional keyword arguments passed to the resampler constructor.
Returns:
A configured SOXRStreamAudioResampler instance.
"""
return SOXRStreamAudioResampler(**kwargs)
def mix_audio(audio1: bytes, audio2: bytes) -> bytes:
"""Mix two audio streams together by adding their samples.

View File

@@ -14,9 +14,8 @@ configurations and event-driven processing.
import time
from typing import Optional
from pipecat.audio.utils import create_default_resampler, interleave_stereo_audio, mix_audio
from pipecat.audio.utils import create_stream_resampler, interleave_stereo_audio, mix_audio
from pipecat.frames.frames import (
AudioRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -106,7 +105,8 @@ class AudioBufferProcessor(FrameProcessor):
self._recording = False
self._resampler = create_default_resampler()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._register_event_handler("on_audio_data")
self._register_event_handler("on_track_audio_data")
@@ -210,7 +210,7 @@ class AudioBufferProcessor(FrameProcessor):
silence = self._compute_silence(self._last_user_frame_at)
self._user_audio_buffer.extend(silence)
# Add user audio.
resampled = await self._resample_audio(frame)
resampled = await self._resample_input_audio(frame)
self._user_audio_buffer.extend(resampled)
# Save time of frame so we can compute silence.
self._last_user_frame_at = time.time()
@@ -219,7 +219,7 @@ class AudioBufferProcessor(FrameProcessor):
silence = self._compute_silence(self._last_bot_frame_at)
self._bot_audio_buffer.extend(silence)
# Add bot audio.
resampled = await self._resample_audio(frame)
resampled = await self._resample_output_audio(frame)
self._bot_audio_buffer.extend(resampled)
# Save time of frame so we can compute silence.
self._last_bot_frame_at = time.time()
@@ -247,7 +247,7 @@ class AudioBufferProcessor(FrameProcessor):
self._bot_turn_audio_buffer = bytearray()
if isinstance(frame, InputAudioRawFrame):
resampled = await self._resample_audio(frame)
resampled = await self._resample_input_audio(frame)
self._user_turn_audio_buffer += resampled
# In the case of the user, we need to keep a short buffer of audio
# since VAD notification of when the user starts speaking comes
@@ -259,7 +259,7 @@ class AudioBufferProcessor(FrameProcessor):
discarded = len(self._user_turn_audio_buffer) - self._audio_buffer_size_1s
self._user_turn_audio_buffer = self._user_turn_audio_buffer[discarded:]
elif self._bot_speaking and isinstance(frame, OutputAudioRawFrame):
resampled = await self._resample_audio(frame)
resampled = await self._resample_output_audio(frame)
self._bot_turn_audio_buffer += resampled
async def _call_on_audio_data_handler(self):
@@ -301,9 +301,17 @@ class AudioBufferProcessor(FrameProcessor):
self._user_turn_audio_buffer = bytearray()
self._bot_turn_audio_buffer = bytearray()
async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
async def _resample_input_audio(self, frame: InputAudioRawFrame) -> bytes:
"""Resample audio frame to the target sample rate."""
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)
return await self._input_resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
async def _resample_output_audio(self, frame: OutputAudioRawFrame) -> bytes:
"""Resample audio frame to the target sample rate."""
return await self._output_resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
def _compute_silence(self, from_time: float) -> bytes:
"""Compute silence to insert based on time gap."""

View File

@@ -13,7 +13,7 @@ from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
@@ -67,7 +67,8 @@ class ExotelFrameSerializer(FrameSerializer):
self._exotel_sample_rate = self._params.exotel_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
@property
def type(self) -> FrameSerializerType:
@@ -104,7 +105,7 @@ class ExotelFrameSerializer(FrameSerializer):
data = frame.audio
# Output: Exotel outputs PCM audio, but we need to resample to match requested sample_rate
serialized_data = await self._resampler.resample(
serialized_data = await self._output_resampler.resample(
data, frame.sample_rate, self._exotel_sample_rate
)
payload = base64.b64encode(serialized_data).decode("ascii")
@@ -138,7 +139,7 @@ class ExotelFrameSerializer(FrameSerializer):
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
deserialized_data = await self._resampler.resample(
deserialized_data = await self._input_resampler.resample(
payload,
self._exotel_sample_rate,
self._sample_rate,

View File

@@ -13,7 +13,7 @@ from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import create_stream_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
@@ -81,7 +81,8 @@ class PlivoFrameSerializer(FrameSerializer):
self._plivo_sample_rate = self._params.plivo_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._hangup_attempted = False
@property
@@ -129,7 +130,7 @@ class PlivoFrameSerializer(FrameSerializer):
# Output: Convert PCM at frame's rate to 8kHz μ-law for Plivo
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._plivo_sample_rate, self._resampler
data, frame.sample_rate, self._plivo_sample_rate, self._output_resampler
)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
@@ -224,7 +225,7 @@ class PlivoFrameSerializer(FrameSerializer):
# Input: Convert Plivo's 8kHz μ-law to PCM at pipeline input rate
deserialized_data = await ulaw_to_pcm(
payload, self._plivo_sample_rate, self._sample_rate, self._resampler
payload, self._plivo_sample_rate, self._sample_rate, self._input_resampler
)
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate

View File

@@ -16,7 +16,7 @@ from pydantic import BaseModel
from pipecat.audio.utils import (
alaw_to_pcm,
create_default_resampler,
create_stream_resampler,
pcm_to_alaw,
pcm_to_ulaw,
ulaw_to_pcm,
@@ -93,7 +93,8 @@ class TelnyxFrameSerializer(FrameSerializer):
self._telnyx_sample_rate = self._params.telnyx_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._hangup_attempted = False
@property
@@ -145,11 +146,11 @@ class TelnyxFrameSerializer(FrameSerializer):
# Output: Convert PCM at frame's rate to 8kHz encoded for Telnyx
if self._params.inbound_encoding == "PCMU":
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
data, frame.sample_rate, self._telnyx_sample_rate, self._output_resampler
)
elif self._params.inbound_encoding == "PCMA":
serialized_data = await pcm_to_alaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
data, frame.sample_rate, self._telnyx_sample_rate, self._output_resampler
)
else:
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
@@ -249,14 +250,14 @@ class TelnyxFrameSerializer(FrameSerializer):
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
self._input_resampler,
)
elif self._params.outbound_encoding == "PCMA":
deserialized_data = await alaw_to_pcm(
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
self._input_resampler,
)
else:
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")

View File

@@ -13,7 +13,7 @@ from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import create_stream_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
@@ -81,7 +81,8 @@ class TwilioFrameSerializer(FrameSerializer):
self._twilio_sample_rate = self._params.twilio_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._hangup_attempted = False
@property
@@ -129,7 +130,7 @@ class TwilioFrameSerializer(FrameSerializer):
# Output: Convert PCM at frame's rate to 8kHz μ-law for Twilio
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._twilio_sample_rate, self._resampler
data, frame.sample_rate, self._twilio_sample_rate, self._output_resampler
)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
@@ -214,7 +215,7 @@ class TwilioFrameSerializer(FrameSerializer):
# Input: Convert Twilio's 8kHz μ-law to PCM at pipeline input rate
deserialized_data = await ulaw_to_pcm(
payload, self._twilio_sample_rate, self._sample_rate, self._resampler
payload, self._twilio_sample_rate, self._sample_rate, self._input_resampler
)
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate

View File

@@ -17,7 +17,7 @@ from typing import AsyncGenerator, List, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -195,7 +195,7 @@ class AWSPollyTTSService(TTSService):
"lexicon_names": params.lexicon_names,
}
self._resampler = create_default_resampler()
self._resampler = create_stream_resampler()
self.set_voice(voice_id)

View File

@@ -17,7 +17,7 @@ import aiohttp
from daily.daily import AudioData, VideoFrame
from loguru import logger
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -75,7 +75,7 @@ class TavusVideoService(AIService):
self._client: Optional[TavusTransportClient] = None
self._conversation_id: str
self._resampler = create_default_resampler()
self._resampler = create_stream_resampler()
self._audio_buffer = bytearray()
self._send_task: Optional[asyncio.Task] = None

View File

@@ -15,7 +15,7 @@ from typing import Any, AsyncGenerator, Dict, Optional
import aiohttp
from loguru import logger
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -121,7 +121,7 @@ class XTTSService(TTSService):
self._studio_speakers: Optional[Dict[str, Any]] = None
self._aiohttp_session = aiohttp_session
self._resampler = create_default_resampler()
self._resampler = create_stream_resampler()
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.

View File

@@ -21,7 +21,7 @@ from loguru import logger
from PIL import Image
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
@@ -358,7 +358,7 @@ class BaseOutputTransport(FrameProcessor):
self._audio_buffer = bytearray()
# This will be used to resample incoming audio to the output sample rate.
self._resampler = create_default_resampler()
self._resampler = create_stream_resampler()
# The user can provide a single mixer, to be used by the default
# destination, or a destination/mixer mapping.

View File

@@ -18,7 +18,7 @@ from typing import Any, Awaitable, Callable, List, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.utils import create_stream_resampler
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.frames.frames import (
AudioRawFrame,
@@ -513,7 +513,7 @@ class LiveKitInputTransport(BaseInputTransport):
self._audio_in_task = None
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
self._resampler = create_default_resampler()
self._resampler = create_stream_resampler()
# Whether we have seen a StartFrame already.
self._initialized = False

View File

@@ -20,7 +20,6 @@ from daily.daily import AudioData
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -437,8 +436,6 @@ class TavusInputTransport(BaseInputTransport):
super().__init__(params, **kwargs)
self._client = client
self._params = params
self._resampler = create_default_resampler()
# Whether we have seen a StartFrame already.
self._initialized = False