Compare commits

...

34 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
99d3227ff5 Merge pull request #1126 from pipecat-ai/aleix/prepare-0.0.55
update CHANGELOG for 0.0.55
2025-02-05 11:32:39 -08:00
Aleix Conchillo Flaqué
7730f59635 update CHANGELOG for 0.0.55 2025-02-05 11:30:40 -08:00
Aleix Conchillo Flaqué
ba31546c32 Merge pull request #1139 from pipecat-ai/aleix/task-start-metadata
pipeline task start metadata and unit test improvements
2025-02-05 10:51:51 -08:00
Aleix Conchillo Flaqué
a363d12d1f dev-requirements: fix conflicts because of nvidia-riva-client 2025-02-05 10:34:46 -08:00
Aleix Conchillo Flaqué
feab9c8fa2 tests: run_test() now uses PipelineTask 2025-02-05 10:34:38 -08:00
Aleix Conchillo Flaqué
61f6669926 task: allow passing StartFrame metadata via start_metadata param 2025-02-05 10:34:38 -08:00
Aleix Conchillo Flaqué
3be69908d2 Merge pull request #1131 from pipecat-ai/aleix/global-audio-sample-rates
introduce PipelineParams audio input/output sample rates
2025-02-05 08:11:25 -08:00
Aleix Conchillo Flaqué
fcb80ec330 playht: don't set sample_rate in _settings 2025-02-05 07:46:24 -08:00
Mark Backman
c9f5684e2f OpenAITTSService: Add warning about changing sample_rate 2025-02-05 10:13:46 -05:00
Mark Backman
c257fa1573 AzureTTSService, AzureHttpTTSService: add start() method 2025-02-05 10:05:19 -05:00
Mark Backman
97c55da29f PlayHTHttpTTSService: add start() method to set sample_rate 2025-02-05 09:54:41 -05:00
Aleix Conchillo Flaqué
49426aa9a1 transport(websocket): improve exception logging 2025-02-04 23:50:45 -08:00
Aleix Conchillo Flaqué
0a333c26da services(elevenlabs): warn if sample rate not supported 2025-02-04 23:50:21 -08:00
Aleix Conchillo Flaqué
75a29424ff examples(telnyx-chatbot): use cartesia so we can use 8khz 2025-02-04 23:49:50 -08:00
Filipi da Silva Fuchter
cd1b429308 Merge pull request #1133 from pipecat-ai/fixing_krisp_issue
Fixing the issue in Krisp when trying to create more than one
2025-02-04 20:44:29 -03:00
Filipi Fuchter
7f1ae4b8cc Fixing the issue in Krisp when trying to create more than one filter in the same process. 2025-02-04 20:10:56 -03:00
Aleix Conchillo Flaqué
af9fd811cd examples(moondream-chatbot): fix UserImageRequester 2025-02-04 14:37:53 -08:00
Aleix Conchillo Flaqué
69f5c9b9d3 update anthropic and openpipe versions 2025-02-04 14:37:36 -08:00
Aleix Conchillo Flaqué
ab45e481be introduce PipelineParams audio input/output sample rates 2025-02-04 14:12:56 -08:00
Aleix Conchillo Flaqué
cc54255c41 Merge pull request #1125 from pipecat-ai/aleix/twilio-chatbot-improvements 2025-02-03 11:10:33 -08:00
Aleix Conchillo Flaqué
1cdb66f889 examples(twilio-chatbot): create sample rate variable 2025-02-03 10:58:06 -08:00
Aleix Conchillo Flaqué
51a86a509c examples: multiple twilio-chatbot improvements 2025-02-03 10:36:24 -08:00
Aleix Conchillo Flaqué
824898f7b7 Merge pull request #1121 from pipecat-ai/aleix/audio-resamplers
introduce audio resamplers
2025-02-03 10:32:55 -08:00
Aleix Conchillo Flaqué
57dadb6359 audio(utils): some variable renames 2025-02-03 09:33:04 -08:00
Aleix Conchillo Flaqué
5dcdc68ef5 examples: fix 22 series initial gate state 2025-02-03 09:16:58 -08:00
Aleix Conchillo Flaqué
aafb2db620 GatedOpenAILLMContextAggregator: use keyword argument and add start_open 2025-02-03 09:16:44 -08:00
Aleix Conchillo Flaqué
f3f22cf61c AudioBufferProcessor: add start_recording()/stop_recording() 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
371c2f3704 canonical: do not reset audio buffers 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
1f14f62696 AudioBufferProcessor: fix audio buffer silence computation 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
06449eff2c BaseAudioResampler: make resample() async 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
dcfb86583d serializers: serialize()/deserialize() are now async 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
cda34a1320 AudioBufferProcessor: fix user/bot audio buffers silence padding 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
13611fd8e1 AudioBufferProcessor: call callback on CancelFrame 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
fc89aad469 introduce audio resamplers 2025-02-01 11:06:55 -08:00
90 changed files with 1407 additions and 585 deletions

View File

@@ -5,10 +5,25 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.55] - 2025-02-05
### Added
- Added a new `start_metadata` field to `PipelineParams`. The provided metadata
will be set to the initial `StartFrame` being pushed from the `PipelineTask`.
- Added new fields to `PipelineParams` to control audio input and output sample
rates for the whole pipeline. This allows controlling sample rates from a
single place instead of having to specify sample rates in each
service. Setting a sample rate to a service is still possible and will
override the value from `PipelineParams`.
- Introduce audio resamplers (`BaseAudioResampler`). This is just a base class
to implement audio resamplers. Currently, two implementations are provided
`SOXRAudioResampler` and `ResampyResampler`. A new
`create_default_resampler()` has been added (replacing the now deprecated
`resample_audio()`).
- It is now possible to specify the asyncio event loop that a `PipelineTask` and
all the processors should run on by passing it as a new argument to the
`PipelineRunner`. This could allow running pipelines in multiple threads each
@@ -41,6 +56,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `GatedOpenAILLMContextAggregator` now require keyword arguments. Also, a new
`start_open` argument has been added to set the initial state of the gate.
- Added `organization` and `project` level authentication to
`OpenAILLMService`.
@@ -56,8 +74,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
`OutputDTMFFrame` frame.
### Deprecated
- `resample_audio()` is now deprecated, use `create_default_resampler()`
instead.
### Removed
- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
`AudioBufferProcessor.start_recording()` and
``AudioBufferProcessor.stop_recording()` instead.
### Fixed
- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
- Fixed an issue in `AudioBufferProcessor` where user callback would not be
called on task cancellation.
- Fixed an issue in `AudioBufferProcessor` that would cause wrong silence
padding in some cases.
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
websocket error by increasing the max message size limit to 16MB.
@@ -73,6 +110,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Other
- Improved Unit Test `run_test()` to use `PipelineTask` and
`PipelineRunner`. There's now also some control around `StartFrame` and
`EndFrame`. The `EndTaskFrame` has been removed since it doesn't seem
necessary with this new approach.
- Updated `twilio-chatbot` with a few new features: use 8000 sample rate and
avoid resampling, a new client useful for stress testing and testing locally
without the need to make phone calls. Also, added audio recording on both the
client and the server to make sure the audio sounds good.
- Updated examples to use `task.cancel()` to immediately exit the example when a
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
an `EndFrame` causes the bot to run through everything that is internally
@@ -1527,6 +1574,9 @@ async def on_connected(processor):
### Changed
- `FrameSerializer.serialize()` and `FrameSerializer.deserialize()` are now
`async`.
- `Filter` has been renamed to `FrameFilter` and it's now under
`processors/filters`.

View File

@@ -1,11 +1,11 @@
build~=1.2.2
grpcio-tools~=1.69.0
grpcio-tools~=1.67.1
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.392
pytest~=8.3.4
pytest-asyncio~=0.25.2
ruff~=0.9.1
setuptools~=75.8.0
setuptools~=70.0.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -17,7 +17,7 @@ from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -31,16 +31,15 @@ logger.add(sys.stderr, level="DEBUG")
class SilenceFrame(OutputAudioRawFrame):
def __init__(
self,
audio: bytes = None,
sample_rate: int = 16000,
num_channels: int = 1,
duration: float = 0.1,
*,
sample_rate: int,
duration: float,
):
# Initialize the parent class with the silent frame's data
super().__init__(
audio=self.create_silent_audio_frame(sample_rate, num_channels, duration).audio,
audio=self.create_silent_audio_frame(sample_rate, 1, duration).audio,
sample_rate=sample_rate,
num_channels=num_channels,
num_channels=1,
)
@staticmethod
@@ -80,7 +79,10 @@ async def main():
return
await task.queue_frames(
[
SilenceFrame(duration=0.5),
SilenceFrame(
sample_rate=task.params.audio_out_sample_rate,
duration=0.5,
),
TTSSpeakFrame(f"Hello there, how are you doing today ?"),
EndFrame(),
]

View File

@@ -124,6 +124,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audio_buffer_processor.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -109,8 +109,9 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Save audio every 10 seconds.
audiobuffer = AudioBufferProcessor(buffer_size=480000)
# NOTE: Watch out! This will save all the conversation in memory. You
# can pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
@@ -132,6 +133,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audiobuffer.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -37,7 +37,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -38,7 +38,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -40,7 +40,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,

View File

@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -61,7 +61,6 @@ async def main():
"Test",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -78,7 +77,9 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
await runner.run(task)

View File

@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
@@ -62,7 +62,7 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
tk_transport = TkLocalTransport(
@@ -82,7 +82,9 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
async def run_tk():
while not task.has_finished():

View File

@@ -51,8 +51,6 @@ async def main():
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
audio_sample_rate=24000,
audio_channels=1,
),
)

View File

@@ -80,9 +80,7 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -177,9 +177,7 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -88,6 +88,10 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -104,8 +104,11 @@ async def main():
)
# This processor keeps the last context and will let it through once the
# notifier is woken up.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
# notifier is woken up. We start with the gate open because we send an
# initial context frame to start the conversation.
gated_context_aggregator = GatedOpenAILLMContextAggregator(
notifier=notifier, start_open=True
)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):

View File

@@ -129,9 +129,9 @@ class CompletenessCheck(FrameProcessor):
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
@@ -252,7 +252,9 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)

View File

@@ -333,9 +333,9 @@ class CompletenessCheck(FrameProcessor):
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
@@ -461,7 +461,9 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)

View File

@@ -639,7 +639,6 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_sample_rate=16000,
),
)

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -84,8 +84,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,
@@ -47,8 +45,6 @@ async def main():
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
start_audio_paused=True,
start_video_paused=True,
),
)

View File

@@ -52,8 +52,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -38,8 +38,6 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
DESIRED_SAMPLE_RATE = 16000
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
@@ -114,11 +112,8 @@ async def main():
token=token,
room_name=room_name,
params=LiveKitParams(
audio_in_channels=1,
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=DESIRED_SAMPLE_RATE,
audio_out_sample_rate=DESIRED_SAMPLE_RATE,
vad_analyzer=SileroVADAnalyzer(),
vad_enabled=True,
vad_audio_passthrough=True,
@@ -128,7 +123,6 @@ async def main():
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
live_options=LiveOptions(
sample_rate=DESIRED_SAMPLE_RATE,
vad_events=True,
),
)
@@ -138,7 +132,6 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=DESIRED_SAMPLE_RATE,
)
messages = [

View File

@@ -106,8 +106,8 @@ class UserImageRequester(FrameProcessor):
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(TextFrame("Describe the image in a short sentence."))
elif isinstance(frame, UserImageRawFrame):
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
class TextFilterProcessor(FrameProcessor):

View File

@@ -121,8 +121,6 @@ async def main():
token,
"Chatbot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,

View File

@@ -1,5 +1,5 @@
beautifulsoup4==4.12.3
pypdf==4.3.1
tiktoken==0.7.0
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
pipecat-ai[daily,cartesia,openai,silero]
python-dotenv==1.0.1

View File

@@ -112,7 +112,6 @@ async def main():
token,
"studypal",
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
@@ -124,7 +123,6 @@ async def main():
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
sample_rate=44100,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
@@ -155,7 +153,12 @@ Your task is to help the user understand and learn from this article in 2 senten
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(
pipeline,
PipelineParams(
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -11,14 +11,13 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.serializers.telnyx import TelnyxFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService, Language
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
@@ -31,7 +30,12 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encoding):
async def run_bot(
websocket_client,
stream_id: str,
outbound_encoding: str,
inbound_encoding: str,
):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
@@ -48,11 +52,9 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="CwhRBWXzGAHq8TQ4Fs17",
output_format="pcm_24000",
params=ElevenLabsTTSService.InputParams(language=Language.EN),
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
messages = [
@@ -77,7 +79,14 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
@@ -87,7 +96,7 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
await task.cancel()
runner = PipelineRunner(handle_sigint=False)

View File

@@ -107,3 +107,34 @@ The server will start on port 8765. Keep this running while you test with Twilio
## Usage
To start a call, simply make a call to your configured Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.
## Testing
It is also possible to automatically test the server without making phone calls by using a software client.
First, update `templates/streams.xml` to point to your server's websocket endpoint. For example:
```
<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="ws://localhost:8765/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>
```
Then, start the server with `-t` to indicate we are testing:
```sh
# Make sure youre in the project directory and your virtual environment is activated
python server.py -t
```
Finally, just point the client to the server's URL:
```sh
python client.py -u http://localhost:8765 -c 2
```
where `-c` allows you to create multiple concurrent clients.

View File

@@ -4,10 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import datetime
import io
import os
import sys
import wave
import aiofiles
from dotenv import load_dotenv
from fastapi import WebSocket
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -15,6 +20,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
@@ -30,10 +36,29 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_sid):
async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = (
f"{server_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
)
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Merged audio saved to {filename}")
else:
logger.info("No audio data to save")
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
@@ -45,23 +70,28 @@ async def run_bot(websocket_client, stream_sid):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
push_silence_after_stop=testing,
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
@@ -70,14 +100,22 @@ async def run_bot(websocket_client, stream_sid):
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Start recording.
await audiobuffer.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@@ -86,6 +124,15 @@ async def run_bot(websocket_client, stream_sid):
async def on_client_disconnected(transport, client):
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
server_name = f"server_{websocket_client.client.port}"
await save_audio(server_name, audio, sample_rate, num_channels)
# We use `handle_sigint=False` because `uvicorn` is controlling keyboard
# interruptions. We use `force_gc=True` to force garbage collection after
# the runner finishes running a task which could be useful for long running
# applications with multiple clients connecting.
runner = PipelineRunner(handle_sigint=False, force_gc=True)
await runner.run(task)

View File

@@ -0,0 +1,199 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import datetime
import io
import os
import sys
import wave
import xml.etree.ElementTree as ET
from uuid import uuid4
import aiofiles
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndFrame, TransportMessageUrgentFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.websocket_client import (
WebsocketClientParams,
WebsocketClientTransport,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
DEFAULT_CLIENT_DURATION = 30
async def download_twiml(server_url: str) -> str:
# TODO(aleix): add error checking.
async with aiohttp.ClientSession() as session:
async with session.post(server_url) as response:
return await response.text()
def get_stream_url_from_twiml(twiml: str) -> str:
root = ET.fromstring(twiml)
# TODO(aleix): add error checking.
stream_element = root.find(".//Stream") # Finds the first <Stream> element
url = stream_element.get("url")
return url
async def save_audio(client_name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = (
f"{client_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
)
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Merged audio saved to {filename}")
else:
logger.info("No audio data to save")
async def run_client(client_name: str, server_url: str, duration_secs: int):
twiml = await download_twiml(server_url)
stream_url = get_stream_url_from_twiml(twiml)
stream_sid = str(uuid4())
transport = WebsocketClientTransport(
uri=stream_url,
params=WebsocketClientParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
serializer=TwilioFrameSerializer(stream_sid),
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)),
vad_audio_passthrough=True,
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# We let the audio passthrough so we can record the conversation.
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
audio_passthrough=True,
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="e13cae5c-ec59-4f71-b0a6-266df3c9bb8e", # Madame Mischief
push_silence_after_stop=True,
)
messages = [
{
"role": "system",
"content": "You are an 8 year old child. A teacher will explain you new concepts you want to know about. Feel free to change topics whnever you want. Once you are taught something you need to keep asking for clarifications if you think someone your age would not understand what you are being taught.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # Websocket input from server
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to server
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
),
)
@transport.event_handler("on_connected")
async def on_connected(transport: WebsocketClientTransport, client):
# Start recording.
await audiobuffer.start_recording()
message = TransportMessageUrgentFrame(
message={"event": "connected", "protocol": "Call", "version": "1.0.0"}
)
await transport.output().send_message(message)
message = TransportMessageUrgentFrame(
message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}}
)
await transport.output().send_message(message)
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
await save_audio(client_name, audio, sample_rate, num_channels)
async def end_call():
await asyncio.sleep(duration_secs)
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await asyncio.gather(runner.run(task), end_call())
async def main():
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Client")
parser.add_argument("-u", "--url", type=str, required=True, help="specify the server URL")
parser.add_argument(
"-c", "--clients", type=int, required=True, help="number of concurrent clients"
)
parser.add_argument(
"-d",
"--duration",
type=int,
default=DEFAULT_CLIENT_DURATION,
help=f"duration of each client in seconds (default: {DEFAULT_CLIENT_DURATION})",
)
args, _ = parser.parse_known_args()
clients = []
for i in range(args.clients):
clients.append(asyncio.create_task(run_client(f"client_{i}", args.url, args.duration)))
await asyncio.gather(*clients)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import json
import uvicorn
@@ -38,8 +39,16 @@ async def websocket_endpoint(websocket: WebSocket):
print(call_data, flush=True)
stream_sid = call_data["start"]["streamSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid)
await run_bot(websocket, stream_sid, app.state.testing)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Server")
parser.add_argument(
"-t", "--test", action="store_true", default=False, help="set the server in testing mode"
)
args, _ = parser.parse_known_args()
app.state.testing = args.test
uvicorn.run(app, host="0.0.0.0", port=8765)

View File

@@ -17,6 +17,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
@@ -80,7 +81,7 @@ class SessionTimeoutHandler:
async def main():
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_sample_rate=16000,
serializer=ProtobufFrameSerializer(),
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
@@ -97,7 +98,6 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
)
messages = [
@@ -122,7 +122,12 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000, audio_out_sample_rate=16000, allow_interruptions=True
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):

View File

@@ -32,6 +32,7 @@ dependencies = [
"protobuf~=5.29.3",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0"
]
@@ -40,7 +41,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.39.0" ]
anthropic = [ "anthropic~=0.45.2" ]
assemblyai = [ "assemblyai~=0.36.0" ]
aws = [ "boto3~=1.35.99" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
@@ -69,7 +70,7 @@ moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
nim = [ "openai~=1.59.6" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
openpipe = [ "openpipe~=4.43.0" ]
openpipe = [ "openpipe~=4.45.0" ]
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.18.0" ]
silero = [ "onnxruntime~=1.20.1" ]

View File

@@ -20,6 +20,22 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class KrispProcessorManager:
"""
Ensures that only one KrispAudioProcessor instance exists for the entire program.
"""
_krisp_instance = None
@classmethod
def get_processor(cls, sample_rate: int, sample_type: str, channels: int, model_path: str):
if cls._krisp_instance is None:
cls._krisp_instance = KrispAudioProcessor(
sample_rate, sample_type, channels, model_path
)
return cls._krisp_instance
class KrispFilter(BaseAudioFilter):
def __init__(
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
@@ -48,7 +64,7 @@ class KrispFilter(BaseAudioFilter):
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
self._krisp_processor = KrispAudioProcessor(
self._krisp_processor = KrispProcessorManager.get_processor(
self._sample_rate, self._sample_type, self._channels, self._model_path
)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,30 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseAudioResampler(ABC):
"""Abstract base class for audio resampling. This class defines an
interface for audio resampling implementations.
"""
@abstractmethod
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""
Resamples the given audio data to a different sample rate.
This is an abstract method that must be implemented in subclasses.
Parameters:
audio (bytes): The audio data to be resampled, represented as a byte string.
in_rate (int): The original sample rate of the audio data (in Hz).
out_rate (int): The desired sample rate for the resampled audio data (in Hz).
Returns:
bytes: The resampled audio data as a byte string.
"""
pass

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import resampy
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
class ResampyResampler(BaseAudioResampler):
"""Audio resampler implementation using the resampy library."""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
if in_rate == out_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = resampy.resample(audio_data, in_rate, out_rate, filter="kaiser_fast")
result = resampled_audio.astype(np.int16).tobytes()
return result

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
class SOXRAudioResampler(BaseAudioResampler):
"""Audio resampler implementation using the SoX resampler library."""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
if in_rate == out_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = soxr.resample(audio_data, in_rate, out_rate, quality="VHQ")
result = resampled_audio.astype(np.int16).tobytes()
return result

View File

@@ -10,8 +10,24 @@ import numpy as np
import pyloudnorm as pyln
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
def create_default_resampler(**kwargs) -> BaseAudioResampler:
return SOXRAudioResampler(**kwargs)
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"'resample_audio()' is deprecated, use 'create_default_resampler()' instead.",
DeprecationWarning,
)
if original_rate == target_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
@@ -75,41 +91,45 @@ def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
return prev_value + factor * (value - prev_value)
def ulaw_to_pcm(ulaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def ulaw_to_pcm(
ulaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
):
# Convert μ-law to PCM
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
# Resample
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
def pcm_to_ulaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def pcm_to_ulaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
out_ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
return ulaw_bytes
return out_ulaw_bytes
def alaw_to_pcm(alaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int) -> bytes:
async def alaw_to_pcm(
alaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
) -> bytes:
# Convert a-law to PCM
in_pcm_bytes = audioop.alaw2lin(alaw_bytes, 2)
# Resample
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
def pcm_to_alaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def pcm_to_alaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
out_alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
return alaw_bytes
return out_alaw_bytes

View File

@@ -5,6 +5,7 @@
#
import time
from typing import Optional
import numpy as np
from loguru import logger
@@ -104,11 +105,8 @@ class SileroOnnxModel:
class SileroVADAnalyzer(VADAnalyzer):
def __init__(self, *, sample_rate: int = 16000, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
if sample_rate != 16000 and sample_rate != 8000:
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, params=params)
logger.debug("Loading Silero VAD model...")
@@ -138,6 +136,12 @@ class SileroVADAnalyzer(VADAnalyzer):
# VADAnalyzer
#
def set_sample_rate(self, sample_rate: int):
if sample_rate != 16000 and sample_rate != 8000:
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
super().set_sample_rate(sample_rate)
def num_frames_required(self) -> int:
return 512 if self.sample_rate == 16000 else 256

View File

@@ -6,6 +6,7 @@
from abc import abstractmethod
from enum import Enum
from typing import Optional
from loguru import logger
from pydantic import BaseModel
@@ -33,11 +34,11 @@ class VADParams(BaseModel):
class VADAnalyzer:
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
self._sample_rate = sample_rate
self._num_channels = num_channels
self.set_params(params)
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams):
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._params = params
self._num_channels = 1
self._vad_buffer = b""
@@ -65,13 +66,17 @@ class VADAnalyzer:
def voice_confidence(self, buffer) -> float:
pass
def set_sample_rate(self, sample_rate: int):
self._sample_rate = self._init_sample_rate or sample_rate
self.set_params(self._params)
def set_params(self, params: VADParams):
logger.info(f"Setting VAD params to: {params}")
self._params = params
self._vad_frames = self.num_frames_required()
self._vad_frames_num_bytes = self._vad_frames * self._num_channels * 2
vad_frames_per_sec = self._vad_frames / self._sample_rate
vad_frames_per_sec = self._vad_frames / self.sample_rate
self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec)
self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec)
@@ -80,7 +85,7 @@ class VADAnalyzer:
self._vad_state: VADState = VADState.QUIET
def _get_smoothed_volume(self, audio: bytes) -> float:
volume = calculate_audio_volume(audio, self._sample_rate)
volume = calculate_audio_volume(audio, self.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
def analyze_audio(self, buffer) -> VADState:

View File

@@ -6,7 +6,18 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Literal, Mapping, Optional, Tuple
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Mapping,
Optional,
Tuple,
)
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.clocks.base_clock import BaseClock
@@ -48,13 +59,13 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
metadata: dict = field(init=False)
metadata: Dict[str, Any] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.metadata: dict = {}
self.metadata: Dict[str, Any] = {}
def __str__(self):
return self.name
@@ -428,11 +439,13 @@ class StartFrame(SystemFrame):
clock: BaseClock
task_manager: TaskManager
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
observer: Optional["BaseObserver"] = None
report_only_initial_ttfb: bool = False
@dataclass

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncIterable, Iterable, List
from typing import Any, AsyncIterable, Dict, Iterable, List
from loguru import logger
from pydantic import BaseModel, ConfigDict
@@ -41,13 +41,16 @@ class PipelineParams(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
allow_interruptions: bool = False
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
enable_heartbeats: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
send_initial_empty_metrics: bool = True
report_only_initial_ttfb: bool = False
observers: List[BaseObserver] = []
heartbeats_period_secs: float = HEARTBEAT_SECONDS
observers: List[BaseObserver] = []
report_only_initial_ttfb: bool = False
send_initial_empty_metrics: bool = True
start_metadata: Dict[str, Any] = {}
class PipelineTaskSource(FrameProcessor):
@@ -136,6 +139,11 @@ class PipelineTask(BaseTask):
"""Returns the name of this task."""
return self._name
@property
def params(self) -> PipelineParams:
"""Returns the pipeline parameters of this task."""
return self._params
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
self._task_manager.set_event_loop(loop)
@@ -271,11 +279,14 @@ class PipelineTask(BaseTask):
clock=self._clock,
task_manager=self._task_manager,
allow_interruptions=self._params.allow_interruptions,
audio_in_sample_rate=self._params.audio_in_sample_rate,
audio_out_sample_rate=self._params.audio_out_sample_rate,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_usage_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
observer=self._observer,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
)
start_frame.metadata = self._params.start_metadata
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:

View File

@@ -16,9 +16,10 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
"""
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._start_open = start_open
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -31,7 +32,11 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
await self._stop()
await self.push_frame(frame)
elif isinstance(frame, OpenAILLMContextFrame):
self._last_context_frame = frame
if self._start_open:
self._start_open = False
await self.push_frame(frame, direction)
else:
self._last_context_frame = frame
else:
await self.push_frame(frame, direction)

View File

@@ -30,7 +30,7 @@ class AsyncGeneratorProcessor(FrameProcessor):
if isinstance(frame, (CancelFrame, EndFrame)):
await self._data_queue.put(None)
else:
data = self._serializer.serialize(frame)
data = await self._serializer.serialize(frame)
if data:
await self._data_queue.put(data)

View File

@@ -4,12 +4,18 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
import time
from typing import Optional
from pipecat.audio.utils import create_default_resampler, interleave_stereo_audio, mix_audio
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -29,16 +35,29 @@ class AudioBufferProcessor(FrameProcessor):
"""
def __init__(
self, *, sample_rate: int = 24000, num_channels: int = 1, buffer_size: int = 0, **kwargs
self,
*,
sample_rate: Optional[int] = None,
num_channels: int = 1,
buffer_size: int = 0,
**kwargs,
):
super().__init__(**kwargs)
self._sample_rate = sample_rate
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._num_channels = num_channels
self._buffer_size = buffer_size
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
self._last_user_frame_at = 0
self._last_bot_frame_at = 0
self._recording = False
self._resampler = create_default_resampler()
self._register_event_handler("on_audio_data")
@property
@@ -64,43 +83,83 @@ class AudioBufferProcessor(FrameProcessor):
else:
return b""
def reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
async def start_recording(self):
self._recording = True
self._reset_recording()
async def stop_recording(self):
await self._call_on_audio_data_handler()
self._recording = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Include all audio from the user.
if isinstance(frame, InputAudioRawFrame):
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
# Update output sample rate if necessary.
if isinstance(frame, StartFrame):
self._update_sample_rate(frame)
if self._recording and isinstance(frame, InputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_user_frame_at)
self._user_audio_buffer.extend(silence)
# Add user audio.
resampled = await self._resample_audio(frame)
self._user_audio_buffer.extend(resampled)
# Sync the bot's buffer to the user's buffer by adding silence if needed
if len(self._user_audio_buffer) > len(self._bot_audio_buffer):
silence = b"\x00" * len(resampled)
self._bot_audio_buffer.extend(silence)
# If the bot is speaking, include all audio from the bot.
elif isinstance(frame, OutputAudioRawFrame):
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
# Save time of frame so we can compute silence.
self._last_user_frame_at = time.time()
elif self._recording and isinstance(frame, OutputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_bot_frame_at)
self._bot_audio_buffer.extend(silence)
# Add bot audio.
resampled = await self._resample_audio(frame)
self._bot_audio_buffer.extend(resampled)
# Save time of frame so we can compute silence.
self._last_bot_frame_at = time.time()
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
await self._call_on_audio_data_handler()
if isinstance(frame, EndFrame):
await self._call_on_audio_data_handler()
if isinstance(frame, (CancelFrame, EndFrame)):
await self.stop_recording()
await self.push_frame(frame, direction)
def _update_sample_rate(self, frame: StartFrame):
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
async def _call_on_audio_data_handler(self):
if not self.has_audio():
if not self.has_audio() or not self._recording:
return
merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
)
self.reset_audio_buffers()
self._reset_audio_buffers()
def _buffer_has_audio(self, buffer: bytearray) -> bool:
return buffer is not None and len(buffer) > 0
def _reset_recording(self):
self._reset_audio_buffers()
self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()
def _reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)
def _compute_silence(self, from_time: float) -> bytes:
quiet_time = time.time() - from_time
# We should get audio frames very frequently. We pick 100ms because
# that's big enough, but it could be even a bit slower since we usually
# do 20ms audio frames.
if from_time == 0 or quiet_time < 0.1:
return b""
num_bytes = int(quiet_time * self._sample_rate) * 2
silence = b"\x00" * num_bytes
return silence

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -11,6 +13,7 @@ from pipecat.audio.vad.vad_analyzer import VADParams, VADState
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
@@ -23,7 +26,7 @@ class SileroVAD(FrameProcessor):
def __init__(
self,
*,
sample_rate: int = 16000,
sample_rate: Optional[int] = None,
vad_params: VADParams = VADParams(),
audio_passthrough: bool = False,
):
@@ -41,6 +44,9 @@ class SileroVAD(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._vad_analyzer.set_sample_rate(frame.audio_in_sample_rate)
if isinstance(frame, AudioRawFrame):
await self._analyze_audio(frame)
if self._audio_passthrough:

View File

@@ -5,6 +5,7 @@
#
import asyncio
from typing import Optional
from loguru import logger
from pydantic import BaseModel
@@ -38,7 +39,7 @@ class GStreamerPipelineSource(FrameProcessor):
class OutputParams(BaseModel):
video_width: int = 1280
video_height: int = 720
audio_sample_rate: int = 24000
audio_sample_rate: Optional[int] = None
audio_channels: int = 1
clock_sync: bool = True
@@ -46,6 +47,7 @@ class GStreamerPipelineSource(FrameProcessor):
super().__init__(**kwargs)
self._out_params = out_params
self._sample_rate = 0
Gst.init()
@@ -90,6 +92,7 @@ class GStreamerPipelineSource(FrameProcessor):
await self.push_frame(frame, direction)
async def _start(self, frame: StartFrame):
self._sample_rate = self._out_params.audio_sample_rate or frame.audio_out_sample_rate
self._player.set_state(Gst.State.PLAYING)
async def _stop(self, frame: EndFrame):
@@ -122,7 +125,7 @@ class GStreamerPipelineSource(FrameProcessor):
audioresample = Gst.ElementFactory.make("audioresample", None)
audiocapsfilter = Gst.ElementFactory.make("capsfilter", None)
audiocaps = Gst.Caps.from_string(
f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
f"audio/x-raw,format=S16LE,rate={self._sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
)
audiocapsfilter.set_property("caps", audiocaps)
appsink_audio = Gst.ElementFactory.make("appsink", None)
@@ -188,7 +191,7 @@ class GStreamerPipelineSource(FrameProcessor):
(_, info) = buffer.map(Gst.MapFlags.READ)
frame = OutputAudioRawFrame(
audio=info.data,
sample_rate=self._out_params.audio_sample_rate,
sample_rate=self._sample_rate,
num_channels=self._out_params.audio_channels,
)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())

View File

@@ -7,7 +7,7 @@
from abc import ABC, abstractmethod
from enum import Enum
from pipecat.frames.frames import Frame
from pipecat.frames.frames import Frame, StartFrame
class FrameSerializerType(Enum):
@@ -21,10 +21,13 @@ class FrameSerializer(ABC):
def type(self) -> FrameSerializerType:
pass
@abstractmethod
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
pass
@abstractmethod
def deserialize(self, data: str | bytes) -> Frame | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
pass
@abstractmethod
async def deserialize(self, data: str | bytes) -> Frame | None:
pass

View File

@@ -25,7 +25,7 @@ class LivekitFrameSerializer(FrameSerializer):
def type(self) -> FrameSerializerType:
return FrameSerializerType.BINARY
def serialize(self, frame: Frame) -> str | bytes | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, OutputAudioRawFrame):
return None
audio_frame = AudioFrame(
@@ -36,7 +36,7 @@ class LivekitFrameSerializer(FrameSerializer):
)
return pickle.dumps(audio_frame)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
audio_frame: AudioFrame = pickle.loads(data)["frame"]
return InputAudioRawFrame(
audio=bytes(audio_frame.data),

View File

@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
def type(self) -> FrameSerializerType:
return FrameSerializerType.BINARY
def serialize(self, frame: Frame) -> str | bytes | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
proto_frame = frame_protos.Frame()
if type(frame) not in self.SERIALIZABLE_TYPES:
logger.warning(f"Frame type {type(frame)} is not serializable")
@@ -57,26 +57,7 @@ class ProtobufFrameSerializer(FrameSerializer):
return proto_frame.SerializeToString()
def deserialize(self, data: str | bytes) -> Frame | None:
"""Returns a Frame object from a Frame protobuf.
Used to convert frames
passed over the wire as protobufs to Frame objects used in pipelines
and frame processors.
>>> serializer = ProtobufFrameSerializer()
>>> serializer.deserialize(
... serializer.serialize(OutputAudioFrame(data=b'1234567890')))
InputAudioFrame(data=b'1234567890')
>>> serializer.deserialize(
... serializer.serialize(TextFrame(text='hello world')))
TextFrame(text='hello world')
>>> serializer.deserialize(serializer.serialize(TranscriptionFrame(
... text="Hello there!", participantId="123", timestamp="2021-01-01")))
TranscriptionFrame(text='Hello there!', participantId='123', timestamp='2021-01-01')
"""
async def deserialize(self, data: str | bytes) -> Frame | None:
proto = frame_protos.Frame.FromString(data)
which = proto.WhichOneof("frame")
if which not in self.DESERIALIZABLE_FIELDS:

View File

@@ -6,16 +6,24 @@
import base64
import json
from typing import Optional
from pydantic import BaseModel
from pipecat.audio.utils import alaw_to_pcm, pcm_to_alaw, pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import (
alaw_to_pcm,
create_default_resampler,
pcm_to_alaw,
pcm_to_ulaw,
ulaw_to_pcm,
)
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartFrame,
StartInterruptionFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -23,8 +31,8 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TelnyxFrameSerializer(FrameSerializer):
class InputParams(BaseModel):
telnyx_sample_rate: int = 8000
sample_rate: int = 16000
telnyx_sample_rate: Optional[int] = None
sample_rate: Optional[int] = None
inbound_encoding: str = "PCMU"
outbound_encoding: str = "PCMU"
@@ -40,21 +48,27 @@ class TelnyxFrameSerializer(FrameSerializer):
params.inbound_encoding = inbound_encoding
self._params = params
self._resampler = create_default_resampler()
@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
self._telnyx_sample_rate = self._params.telnyx_sample_rate or frame.audio_in_sample_rate
self._sample_rate = self._params.sample_rate or frame.audio_out_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, AudioRawFrame):
data = frame.audio
if self._params.inbound_encoding == "PCMU":
serialized_data = pcm_to_ulaw(
data, frame.sample_rate, self._params.telnyx_sample_rate
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
)
elif self._params.inbound_encoding == "PCMA":
serialized_data = pcm_to_alaw(
data, frame.sample_rate, self._params.telnyx_sample_rate
serialized_data = await pcm_to_alaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
)
else:
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
@@ -71,7 +85,7 @@ class TelnyxFrameSerializer(FrameSerializer):
answer = {"event": "clear"}
return json.dumps(answer)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if message["event"] == "media":
@@ -79,18 +93,24 @@ class TelnyxFrameSerializer(FrameSerializer):
payload = base64.b64decode(payload_base64)
if self._params.outbound_encoding == "PCMU":
deserialized_data = ulaw_to_pcm(
payload, self._params.telnyx_sample_rate, self._params.sample_rate
deserialized_data = await ulaw_to_pcm(
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
)
elif self._params.outbound_encoding == "PCMA":
deserialized_data = alaw_to_pcm(
payload, self._params.telnyx_sample_rate, self._params.sample_rate
deserialized_data = await alaw_to_pcm(
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
)
else:
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)
return audio_frame
elif message["event"] == "dtmf":

View File

@@ -6,16 +6,18 @@
import base64
import json
from typing import Optional
from pydantic import BaseModel
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
@@ -25,25 +27,36 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TwilioFrameSerializer(FrameSerializer):
class InputParams(BaseModel):
twilio_sample_rate: int = 8000
sample_rate: int = 16000
twilio_sample_rate: Optional[int] = None
sample_rate: Optional[int] = None
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
self._stream_sid = stream_sid
self._params = params
self._twilio_sample_rate = 0
self._sample_rate = 0
self._resampler = create_default_resampler()
@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
self._twilio_sample_rate = self._params.twilio_sample_rate or frame.audio_in_sample_rate
self._sample_rate = self._params.sample_rate or frame.audio_out_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
data = frame.audio
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._twilio_sample_rate, self._resampler
)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
@@ -55,18 +68,18 @@ class TwilioFrameSerializer(FrameSerializer):
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
return json.dumps(frame.message)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if message["event"] == "media":
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
deserialized_data = ulaw_to_pcm(
payload, self._params.twilio_sample_rate, self._params.sample_rate
deserialized_data = await ulaw_to_pcm(
payload, self._twilio_sample_rate, self._sample_rate, self._resampler
)
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)
return audio_frame
elif message["event"] == "dtmf":

View File

@@ -213,7 +213,7 @@ class TTSService(AIService):
# if push_silence_after_stop is True, send this amount of audio silence
silence_time_s: float = 2.0,
# TTS output sample rate
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
text_filter: Optional[BaseTextFilter] = None,
**kwargs,
):
@@ -224,7 +224,8 @@ class TTSService(AIService):
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._push_silence_after_stop: bool = push_silence_after_stop
self._silence_time_s: float = silence_time_s
self._sample_rate: int = sample_rate
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._voice_id: str = ""
self._settings: Dict[str, Any] = {}
self._text_filter: Optional[BaseTextFilter] = text_filter
@@ -248,16 +249,20 @@ class TTSService(AIService):
async def flush_audio(self):
pass
def language_to_service_language(self, language: Language) -> str | None:
return Language(language)
# Converts the text to audio.
@abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
pass
def language_to_service_language(self, language: Language) -> str | None:
return Language(language)
async def update_setting(self, key: str, value: Any):
pass
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
if self._push_stop_frames:
self._stop_frame_task = self.create_task(self._stop_frame_handler())
@@ -467,9 +472,17 @@ class WordTTSService(TTSService):
class STTService(AIService):
"""STTService is a base class for speech-to-text services."""
def __init__(self, audio_passthrough=False, **kwargs):
def __init__(
self,
audio_passthrough=False,
# STT input sample rate
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
self._audio_passthrough = audio_passthrough
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._settings: Dict[str, Any] = {}
self._muted: bool = False
@@ -478,6 +491,10 @@ class STTService(AIService):
"""Returns whether the STT service is currently muted."""
return self._muted
@property
def sample_rate(self) -> int:
return self._sample_rate
@abstractmethod
async def set_model(self, model: str):
self.set_model_name(model)
@@ -491,6 +508,10 @@ class STTService(AIService):
"""Returns transcript as a string"""
pass
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate
async def _update_settings(self, settings: Mapping[str, Any]):
logger.info(f"Updating STT settings: {self._settings}")
for key, value in settings.items():
@@ -540,17 +561,15 @@ class SegmentedSTTService(STTService):
min_volume: float = 0.6,
max_silence_secs: float = 0.3,
max_buffer_secs: float = 1.5,
sample_rate: int = 24000,
num_channels: int = 1,
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._min_volume = min_volume
self._max_silence_secs = max_silence_secs
self._max_buffer_secs = max_buffer_secs
self._sample_rate = sample_rate
self._num_channels = num_channels
(self._content, self._wave) = self._new_wave()
self._content = None
self._wave = None
self._silence_num_frames = 0
# Volume exponential smoothing
self._smoothing_factor = 0.2
@@ -569,8 +588,8 @@ class SegmentedSTTService(STTService):
# If buffer is not empty and we have enough data or there's been a long
# silence, transcribe the audio gathered so far.
silence_secs = self._silence_num_frames / self._sample_rate
buffer_secs = self._wave.getnframes() / self._sample_rate
silence_secs = self._silence_num_frames / self.sample_rate
buffer_secs = self._wave.getnframes() / self.sample_rate
if self._content.tell() > 0 and (
buffer_secs > self._max_buffer_secs or silence_secs > self._max_silence_secs
):
@@ -580,18 +599,24 @@ class SegmentedSTTService(STTService):
await self.process_generator(self.run_stt(self._content.read()))
(self._content, self._wave) = self._new_wave()
async def start(self, frame: StartFrame):
await super().start(frame)
(self._content, self._wave) = self._new_wave()
async def stop(self, frame: EndFrame):
await super().stop(frame)
self._wave.close()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._wave.close()
def _new_wave(self):
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(self._num_channels)
ww.setframerate(self._sample_rate)
ww.setnchannels(1)
ww.setframerate(self.sample_rate)
return (content, ww)
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -38,20 +38,17 @@ class AssemblyAISTTService(STTService):
self,
*,
api_key: str,
sample_rate: int = 16000,
sample_rate: Optional[int] = None,
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
language=Language.EN, # Only English is supported for Realtime
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
aai.settings.api_key = api_key
self._transcriber: aai.RealtimeTranscriber | None = None
# Store reference to the main event loop for use in callback functions
self._loop = asyncio.get_event_loop()
self._settings = {
"sample_rate": sample_rate,
"encoding": encoding,
"language": language,
}
@@ -121,7 +118,7 @@ class AssemblyAISTTService(STTService):
# Schedule the coroutine to run in the main event loop
# This is necessary because this callback runs in a different thread
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
def on_error(error: aai.RealtimeError):
"""Callback for handling errors from AssemblyAI.
@@ -131,14 +128,16 @@ class AssemblyAISTTService(STTService):
"""
logger.error(f"{self}: An error occurred: {error}")
# Schedule the coroutine to run in the main event loop
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
asyncio.run_coroutine_threadsafe(
self.push_frame(ErrorFrame(str(error))), self.get_event_loop()
)
def on_close():
"""Callback for when the connection to AssemblyAI is closed."""
logger.info(f"{self}: Disconnected from AssemblyAI")
self._transcriber = aai.RealtimeTranscriber(
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
encoding=self._settings["encoding"],
on_data=on_data,
on_error=on_error,

View File

@@ -10,7 +10,7 @@ from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -124,7 +124,7 @@ class PollyTTSService(TTSService):
aws_session_token: Optional[str] = None,
region: Optional[str] = None,
voice_id: str = "Joanna",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -138,7 +138,6 @@ class PollyTTSService(TTSService):
region_name=region,
)
self._settings = {
"sample_rate": sample_rate,
"engine": params.engine,
"language": self.language_to_service_language(params.language)
if params.language
@@ -148,6 +147,8 @@ class PollyTTSService(TTSService):
"volume": params.volume,
}
self._resampler = create_default_resampler()
self.set_voice(voice_id)
def can_generate_metrics(self) -> bool:
@@ -193,8 +194,7 @@ class PollyTTSService(TTSService):
response = self._polly_client.synthesize_speech(**args)
if "AudioStream" in response:
audio_data = response["AudioStream"].read()
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
return resampled
return audio_data
return None
logger.debug(f"Generating TTS: [{text}]")
@@ -225,6 +225,8 @@ class PollyTTSService(TTSService):
yield None
return
audio_data = await self._resampler.resample(audio_data, 16000, self.sample_rate)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
@@ -234,7 +236,7 @@ class PollyTTSService(TTSService):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -450,14 +450,13 @@ class AzureBaseTTSService(TTSService):
api_key: str,
region: str,
voice="en-US-SaraNeural",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
"emphasis": params.emphasis,
"language": self.language_to_service_language(params.language)
if params.language
@@ -530,25 +529,32 @@ class AzureBaseTTSService(TTSService):
class AzureTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._speech_config = None
self._speech_synthesizer = None
self._audio_queue = asyncio.Queue()
speech_config = SpeechConfig(
async def start(self, frame: StartFrame):
await super().start(frame)
# Now self.sample_rate is properly initialized
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
speech_config.set_service_property(
self._speech_config.set_service_property(
"synthesizer.synthesis.connection.synthesisConnectionImpl",
"websocket",
ServicePropertyChannel.UriQueryParameter,
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
# Set up event handlers
self._audio_queue = asyncio.Queue()
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
@@ -591,7 +597,7 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSAudioRawFrame(
audio=chunk,
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
@@ -605,17 +611,22 @@ class AzureTTSService(AzureBaseTTSService):
class AzureHttpTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._speech_config = None
self._speech_synthesizer = None
speech_config = SpeechConfig(
async def start(self, frame: StartFrame):
await super().start(frame)
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -633,7 +644,7 @@ class AzureHttpTTSService(AzureBaseTTSService):
# Azure always sends a 44-byte header. Strip it off.
yield TTSAudioRawFrame(
audio=result.audio_data[44:],
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
yield TTSStoppedFrame()
@@ -650,24 +661,14 @@ class AzureSTTService(STTService):
*,
api_key: str,
region: str,
language=Language.EN_US,
sample_rate=24000,
channels=1,
language: Language = Language.EN_US,
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
speech_config = SpeechConfig(subscription=api_key, region=region)
speech_config.speech_recognition_language = language
stream_format = AudioStreamFormat(samples_per_second=sample_rate, channels=channels)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=speech_config, audio_config=audio_config
)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_config = SpeechConfig(subscription=api_key, region=region)
self._speech_config.speech_recognition_language = language
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
@@ -677,6 +678,16 @@ class AzureSTTService(STTService):
async def start(self, frame: StartFrame):
await super().start(frame)
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
async def stop(self, frame: EndFrame):

View File

@@ -117,7 +117,6 @@ class CanonicalMetricsService(AIService):
try:
await self._multipart_upload(filename)
await aiofiles.os.remove(filename)
audio_buffer_processor.reset_audio_buffers()
except FileNotFoundError:
pass
except Exception as e:

View File

@@ -89,7 +89,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
cartesia_version: str = "2024-06-10",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
@@ -121,7 +121,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -174,6 +174,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):
@@ -262,7 +263,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
self.start_word_timestamps()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self._settings["output_format"]["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
await self.push_frame(frame)
@@ -328,7 +329,7 @@ class CartesiaHttpTTSService(TTSService):
voice_id: str,
model: str = "sonic",
base_url: str = "https://api.cartesia.ai",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
@@ -341,7 +342,7 @@ class CartesiaHttpTTSService(TTSService):
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -360,6 +361,10 @@ class CartesiaHttpTTSService(TTSService):
def language_to_service_language(self, language: Language) -> str | None:
return language_to_cartesia_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.close()
@@ -394,9 +399,7 @@ class CartesiaHttpTTSService(TTSService):
)
frame = TTSAudioRawFrame(
audio=output["audio"],
sample_rate=self._settings["output_format"]["sample_rate"],
num_channels=1,
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
)
yield frame
except Exception as e:

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -53,14 +53,13 @@ class DeepgramTTSService(TTSService):
*,
api_key: str,
voice: str = "aura-helios-en",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "linear16",
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
"encoding": encoding,
}
self.set_voice(voice)
@@ -75,7 +74,7 @@ class DeepgramTTSService(TTSService):
options = SpeakOptions(
model=self._voice_id,
encoding=self._settings["encoding"],
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
container="none",
)
@@ -103,9 +102,7 @@ class DeepgramTTSService(TTSService):
chunk = audio_buffer.read(chunk_size)
if not chunk:
break
frame = TTSAudioRawFrame(
audio=chunk, sample_rate=self._settings["sample_rate"], num_channels=1
)
frame = TTSAudioRawFrame(audio=chunk, sample_rate=self.sample_rate, num_channels=1)
yield frame
yield TTSStoppedFrame()
@@ -121,15 +118,16 @@ class DeepgramSTTService(STTService):
*,
api_key: str,
url: str = "",
live_options: LiveOptions = None,
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-2-general",
sample_rate=16000,
channels=1,
interim_results=True,
smart_format=True,
@@ -187,6 +185,7 @@ class DeepgramSTTService(STTService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):

View File

@@ -104,17 +104,20 @@ def language_to_elevenlabs_language(language: Language) -> str | None:
return result
def sample_rate_from_output_format(output_format: str) -> int:
match output_format:
case "pcm_16000":
return 16000
case "pcm_22050":
return 22050
case "pcm_24000":
return 24000
case "pcm_44100":
return 44100
return 16000
def output_format_from_sample_rate(sample_rate: int) -> str:
match sample_rate:
case 16000:
return "pcm_16000"
case 22050:
return "pcm_22050"
case 24000:
return "pcm_24000"
case 44100:
return "pcm_44100"
logger.warning(
f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate"
)
return "pcm_16000"
def calculate_word_times(
@@ -165,7 +168,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
voice_id: str,
model: str = "eleven_flash_v2_5",
url: str = "wss://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -189,7 +192,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
sample_rate=sample_rate_from_output_format(output_format),
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
@@ -197,11 +200,9 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
self._api_key = api_key
self._url = url
self._settings = {
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else None,
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
@@ -211,6 +212,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
# Indicates if we have sent TTSStartedFrame. It will reset to False when
@@ -254,7 +256,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
await self._disconnect()
await self._connect()
async def _update_settings(self, settings: Dict[str, Any]):
async def _update_settings(self, settings: Mapping[str, Any]):
prev_voice = self._voice_id
await super()._update_settings(settings)
if not prev_voice == self._voice_id:
@@ -264,6 +266,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
await self._connect()
async def stop(self, frame: EndFrame):
@@ -322,7 +325,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
voice_id = self._voice_id
model = self.model_name
output_format = self._settings["output_format"]
output_format = self._output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}&auto_mode={self._settings['auto_mode']}"
if self._settings["optimize_streaming_latency"]:
@@ -375,7 +378,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
await self.push_frame(frame)
if msg.get("alignment"):
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
@@ -428,7 +431,7 @@ class ElevenLabsHttpTTSService(TTSService):
aiohttp_session: aiohttp ClientSession
model: Model ID (default: "eleven_flash_v2_5" for low latency)
base_url: API base URL
output_format: Audio output format (PCM)
sample_rate: Output sample rate
params: Additional parameters for voice configuration
"""
@@ -448,24 +451,21 @@ class ElevenLabsHttpTTSService(TTSService):
aiohttp_session: aiohttp.ClientSession,
model: str = "eleven_flash_v2_5",
base_url: str = "https://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = base_url
self._output_format = output_format
self._params = params
self._session = aiohttp_session
self._settings = {
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else None,
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
@@ -474,6 +474,7 @@ class ElevenLabsHttpTTSService(TTSService):
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
def can_generate_metrics(self) -> bool:
@@ -508,6 +509,10 @@ class ElevenLabsHttpTTSService(TTSService):
return voice_settings or None
async def start(self, frame: StartFrame):
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using ElevenLabs streaming API.
@@ -570,7 +575,7 @@ class ElevenLabsHttpTTSService(TTSService):
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield TTSStoppedFrame()

View File

@@ -56,7 +56,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
api_key: str,
model: str, # This is the reference_id
output_format: FishAudioOutputFormat = "pcm",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -70,7 +70,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
self._started = False
self._settings = {
"sample_rate": sample_rate,
"sample_rate": 0,
"latency": params.latency,
"format": output_format,
"prosody": {
@@ -92,6 +92,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):
@@ -157,9 +158,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
audio_data = msg.get("audio")
# Only process larger chunks to remove msgpack overhead
if audio_data and len(audio_data) > 1024:
frame = TTSAudioRawFrame(
audio_data, self._settings["sample_rate"], 1
)
frame = TTSAudioRawFrame(audio_data, self.sample_rate, 1)
await self.push_frame(frame)
await self.stop_ttfb_metrics()
continue

View File

@@ -48,7 +48,7 @@ class AudioInputMessage(BaseModel):
realtimeInput: RealtimeInput
@classmethod
def from_raw_audio(cls, raw_audio: bytes, sample_rate=16000) -> "AudioInputMessage":
def from_raw_audio(cls, raw_audio: bytes, sample_rate: int) -> "AudioInputMessage":
data = base64.b64encode(raw_audio).decode("utf-8")
return cls(
realtimeInput=RealtimeInput(

View File

@@ -203,6 +203,8 @@ class GeminiMultimodalLiveLLMService(LLMService):
self._bot_audio_buffer = bytearray()
self._bot_text_buffer = ""
self._sample_rate = 24000
self._settings = {
"frequency_penalty": params.frequency_penalty,
"max_tokens": params.max_tokens,
@@ -521,7 +523,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
if self._audio_input_paused:
return
# Send all audio to Gemini
evt = events.AudioInputMessage.from_raw_audio(frame.audio)
evt = events.AudioInputMessage.from_raw_audio(frame.audio, frame.sample_rate)
await self.send_client_event(evt)
# Manage a buffer of audio to use for transcription
audio = frame.audio
@@ -650,7 +652,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
inline_data = part.inlineData
if not inline_data:
return
if inline_data.mimeType != "audio/pcm;rate=24000":
if inline_data.mimeType != f"audio/pcm;rate={self._sample_rate}":
logger.warning(f"Unrecognized server_content format {inline_data.mimeType}")
return
@@ -665,7 +667,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
self._bot_audio_buffer.extend(audio)
frame = TTSAudioRawFrame(
audio=audio,
sample_rate=24000,
sample_rate=self._sample_rate,
num_channels=1,
)
await self.push_frame(frame)

View File

@@ -131,7 +131,6 @@ def language_to_gladia_language(language: Language) -> str | None:
class GladiaSTTService(STTService):
class InputParams(BaseModel):
sample_rate: Optional[int] = 16000
language: Optional[Language] = Language.EN
endpointing: Optional[float] = 0.2
maximum_duration_without_endpointing: Optional[int] = 10
@@ -144,17 +143,18 @@ class GladiaSTTService(STTService):
api_key: str,
url: str = "https://api.gladia.io/v2/live",
confidence: float = 0.5,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._url = url
self._settings = {
"encoding": "wav/pcm",
"bit_depth": 16,
"sample_rate": params.sample_rate,
"sample_rate": 0,
"channels": 1,
"language_config": {
"languages": [self.language_to_service_language(params.language)]
@@ -178,6 +178,7 @@ class GladiaSTTService(STTService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
response = await self._setup_gladia()
self._websocket = await websockets.connect(response["url"])
self._receive_task = self.create_task(self._receive_task_handler())

View File

@@ -883,14 +883,13 @@ class GoogleTTSService(TTSService):
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
voice_id: str = "en-US-Neural2-A",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
"pitch": params.pitch,
"rate": params.rate,
"volume": params.volume,
@@ -996,7 +995,7 @@ class GoogleTTSService(TTSService):
)
audio_config = texttospeech_v1.AudioConfig(
audio_encoding=texttospeech_v1.AudioEncoding.LINEAR16,
sample_rate_hertz=self._settings["sample_rate"],
sample_rate_hertz=self.sample_rate,
)
request = texttospeech_v1.SynthesizeSpeechRequest(
@@ -1019,7 +1018,7 @@ class GoogleTTSService(TTSService):
if not chunk:
break
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
await asyncio.sleep(0) # Allow other tasks to run

View File

@@ -5,7 +5,7 @@
#
import json
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -66,7 +66,7 @@ class LmntTTSService(TTSService, WebsocketService):
*,
api_key: str,
voice_id: str,
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
language: Language = Language.EN,
**kwargs,
):
@@ -81,7 +81,6 @@ class LmntTTSService(TTSService, WebsocketService):
self._api_key = api_key
self._voice_id = voice_id
self._settings = {
"sample_rate": sample_rate,
"language": self.language_to_service_language(language),
"format": "raw", # Use raw format for direct PCM data
}
@@ -132,7 +131,7 @@ class LmntTTSService(TTSService, WebsocketService):
"X-API-Key": self._api_key,
"voice": self._voice_id,
"format": self._settings["format"],
"sample_rate": self._settings["sample_rate"],
"sample_rate": self.sample_rate,
"language": self._settings["language"],
}
@@ -175,7 +174,7 @@ class LmntTTSService(TTSService, WebsocketService):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
audio=message,
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
await self.push_frame(frame)

View File

@@ -28,6 +28,7 @@ from pipecat.frames.frames import (
LLMTextFrame,
LLMUpdateSettingsFrame,
OpenAILLMContextAssistantTimestampFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSStartedFrame,
@@ -412,20 +413,24 @@ class OpenAITTSService(TTSService):
The service returns PCM-encoded audio at the specified sample rate.
"""
OPENAI_SAMPLE_RATE = 24000 # OpenAI TTS always outputs at 24kHz
def __init__(
self,
*,
api_key: str | None = None,
api_key: Optional[str] = None,
voice: str = "alloy",
model: Literal["tts-1", "tts-1-hd"] = "tts-1",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
**kwargs,
):
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
f"OpenAI TTS only supports {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
f"Current rate of {self.sample_rate}Hz may cause issues."
)
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
}
self.set_model_name(model)
self.set_voice(voice)
@@ -438,6 +443,14 @@ class OpenAITTSService(TTSService):
logger.info(f"Switching TTS model to: [{model}]")
self.set_model_name(model)
async def start(self, frame: StartFrame):
await super().start(frame)
if self.sample_rate != self.OPENAI_SAMPLE_RATE:
logger.warning(
f"OpenAI TTS requires {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
f"Current rate of {self.sample_rate}Hz may cause issues."
)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
@@ -465,7 +478,7 @@ class OpenAITTSService(TTSService):
async for chunk in r.iter_bytes(8192):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()
except BadRequestError as e:

View File

@@ -113,7 +113,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
user_id: str,
voice_url: str,
voice_engine: str = "Play3.0-mini",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
output_format: str = "wav",
params: InputParams = InputParams(),
**kwargs,
@@ -132,7 +132,6 @@ class PlayHTTTSService(TTSService, WebsocketService):
self._request_id = None
self._settings = {
"sample_rate": sample_rate,
"language": self.language_to_service_language(params.language)
if params.language
else "english",
@@ -250,7 +249,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
if message.startswith(b"RIFF"):
continue
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(message, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(message, self.sample_rate, 1)
await self.push_frame(frame)
else:
logger.debug(f"Received text message: {message}")
@@ -301,7 +300,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
"voice": self._voice_id,
"voice_engine": self._settings["voice_engine"],
"output_format": self._settings["output_format"],
"sample_rate": self._settings["sample_rate"],
"sample_rate": self.sample_rate,
"language": self._settings["language"],
"speed": self._settings["speed"],
"seed": self._settings["seed"],
@@ -339,7 +338,7 @@ class PlayHTHttpTTSService(TTSService):
user_id: str,
voice_url: str,
voice_engine: str = "Play3.0-mini-http", # Options: Play3.0-mini-http, Play3.0-mini-ws
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -353,7 +352,6 @@ class PlayHTHttpTTSService(TTSService):
api_key=self._api_key,
)
self._settings = {
"sample_rate": sample_rate,
"language": self.language_to_service_language(params.language)
if params.language
else "english",
@@ -365,6 +363,11 @@ class PlayHTHttpTTSService(TTSService):
self.set_model_name(voice_engine)
self.set_voice(voice_url)
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
def _create_options(self) -> TTSOptions:
language_str = self._settings["language"]
playht_language = None
if language_str:
@@ -374,10 +377,10 @@ class PlayHTHttpTTSService(TTSService):
playht_language = lang
break
self._options = TTSOptions(
return TTSOptions(
voice=self._voice_id,
language=playht_language,
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
format=self._settings["format"],
speed=self._settings["speed"],
seed=self._settings["seed"],
@@ -393,13 +396,14 @@ class PlayHTHttpTTSService(TTSService):
logger.debug(f"Generating TTS: [{text}]")
try:
options = self._create_options()
b = bytearray()
in_header = True
await self.start_ttfb_metrics()
playht_gen = self._client.tts(
text, voice_engine=self._settings["voice_engine"], options=self._options
text, voice_engine=self._settings["voice_engine"], options=options
)
await self.start_tts_usage_metrics(text)
@@ -422,7 +426,7 @@ class PlayHTHttpTTSService(TTSService):
else:
if len(chunk):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()
except Exception as e:

View File

@@ -34,7 +34,7 @@ class RimeHttpTTSService(TTSService):
api_key: str,
voice_id: str = "eva",
model: str = "mist",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -43,7 +43,6 @@ class RimeHttpTTSService(TTSService):
self._api_key = api_key
self._base_url = "https://users.rime.ai/v1/rime-tts"
self._settings = {
"samplingRate": sample_rate,
"speedAlpha": params.speed_alpha,
"reduceLatency": params.reduce_latency,
"pauseBetweenBrackets": params.pause_between_brackets,
@@ -71,6 +70,7 @@ class RimeHttpTTSService(TTSService):
payload["text"] = text
payload["speaker"] = self._voice_id
payload["modelId"] = self._model_name
payload["samplingRate"] = self.sample_rate
try:
await self.start_ttfb_metrics()
@@ -96,7 +96,7 @@ class RimeHttpTTSService(TTSService):
first_chunk = False
if chunk:
frame = TTSAudioRawFrame(chunk, self._settings["samplingRate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -49,7 +49,7 @@ class FastPitchTTSService(TTSService):
api_key: str,
server: str = "grpc.nvcf.nvidia.com:443",
voice_id: str = "English-US.Female-1",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
function_id: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972",
params: InputParams = InputParams(),
**kwargs,
@@ -57,7 +57,6 @@ class FastPitchTTSService(TTSService):
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._voice_id = voice_id
self._sample_rate = sample_rate
self._language_code = params.language
self._quality = params.quality
@@ -87,7 +86,7 @@ class FastPitchTTSService(TTSService):
text,
self._voice_id,
self._language_code,
sample_rate_hz=self._sample_rate,
sample_rate_hz=self.sample_rate,
audio_prompt_file=None,
quality=self._quality,
custom_dictionary={},
@@ -114,7 +113,7 @@ class FastPitchTTSService(TTSService):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
audio=resp.audio,
sample_rate=self._sample_rate,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
@@ -136,10 +135,11 @@ class ParakeetSTTService(STTService):
api_key: str,
server: str = "grpc.nvcf.nvidia.com:443",
function_id: str = "1598d209-5e27-4d3c-8079-4751568b1081",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._profanity_filter = False
self._automatic_punctuation = False
@@ -154,7 +154,6 @@ class ParakeetSTTService(STTService):
self._stop_history_eou = -1
self._stop_threshold_eou = -1.0
self._custom_configuration = ""
self._sample_rate: int = 16000
self.set_model_name("parakeet-ctc-1.1b-asr")
@@ -166,6 +165,14 @@ class ParakeetSTTService(STTService):
self._asr_service = riva.client.ASRService(auth)
self._queue = asyncio.Queue()
def can_generate_metrics(self) -> bool:
return False
async def start(self, frame: StartFrame):
await super().start(frame)
config = riva.client.StreamingRecognitionConfig(
config=riva.client.RecognitionConfig(
encoding=riva.client.AudioEncoding.LINEAR_PCM,
@@ -175,14 +182,16 @@ class ParakeetSTTService(STTService):
profanity_filter=self._profanity_filter,
enable_automatic_punctuation=self._automatic_punctuation,
verbatim_transcripts=not self._no_verbatim_transcripts,
sample_rate_hertz=self._sample_rate,
sample_rate_hertz=self.sample_rate,
audio_channel_count=1,
),
interim_results=True,
)
riva.client.add_word_boosting_to_config(
config, self._boosted_lm_words, self._boosted_lm_score
)
riva.client.add_endpoint_parameters_to_config(
config,
self._start_history,
@@ -193,15 +202,9 @@ class ParakeetSTTService(STTService):
self._stop_threshold_eou,
)
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
self._config = config
self._queue = asyncio.Queue()
def can_generate_metrics(self) -> bool:
return False
async def start(self, frame: StartFrame):
await super().start(frame)
self._thread_task = self.create_task(self._thread_task_handler())
self._response_task = self.create_task(self._response_task_handler())
self._response_queue = asyncio.Queue()

View File

@@ -12,7 +12,7 @@ import base64
import aiohttp
from loguru import logger
from pipecat.audio.utils import resample_audio
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -47,6 +47,8 @@ class TavusVideoService(AIService):
self._conversation_id: str
self._resampler = create_default_resampler()
async def initialize(self) -> str:
url = "https://tavusapi.com/v2/conversations"
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
@@ -89,12 +91,10 @@ class TavusVideoService(AIService):
async with self._session.post(url, headers=headers) as r:
r.raise_for_status()
async def _encode_audio_and_send(
self, audio: bytes, original_sample_rate: int, done: bool
) -> None:
async def _encode_audio_and_send(self, audio: bytes, in_rate: int, done: bool) -> None:
"""Encodes audio to base64 and sends it to Tavus"""
if not done:
audio = resample_audio(audio, original_sample_rate, 16000)
audio = await self._resampler.resample(audio, in_rate, 16000)
audio_base64 = base64.b64encode(audio).decode("utf-8")
logger.trace(f"{self}: sending {len(audio)} bytes")
await self._send_audio_message(audio_base64, done=done)

View File

@@ -4,12 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, AsyncGenerator, Dict
from typing import Any, AsyncGenerator, Dict, Optional
import aiohttp
from loguru import logger
from pipecat.audio.utils import resample_audio
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -76,7 +76,7 @@ class XTTSService(TTSService):
base_url: str,
aiohttp_session: aiohttp.ClientSession,
language: Language = Language.EN,
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
@@ -89,6 +89,8 @@ class XTTSService(TTSService):
self._studio_speakers: Dict[str, Any] | None = None
self._aiohttp_session = aiohttp_session
self._resampler = create_default_resampler()
def can_generate_metrics(self) -> bool:
return True
@@ -161,17 +163,19 @@ class XTTSService(TTSService):
buffer = buffer[48000:]
# XTTS uses 24000 so we need to resample to our desired rate.
resampled_audio = resample_audio(
bytes(process_data), 24000, self._sample_rate
resampled_audio = await self._resampler.resample(
bytes(process_data), 24000, self.sample_rate
)
# Create the frame with the resampled audio
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
frame = TTSAudioRawFrame(resampled_audio, self.sample_rate, 1)
yield frame
# Process any remaining data in the buffer.
if len(buffer) > 0:
resampled_audio = resample_audio(bytes(buffer), 24000, self._sample_rate)
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
resampled_audio = await self._resampler.resample(
bytes(buffer), 24000, self.sample_rate
)
frame = TTSAudioRawFrame(resampled_audio, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -5,24 +5,25 @@
#
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Sequence, Tuple
import sys
from typing import Any, Awaitable, Callable, Dict, Sequence, Tuple
from loguru import logger
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
ControlFrame,
EndFrame,
Frame,
HeartbeatFrame,
StartFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio import TaskManager
@dataclass
class EndTestFrame(ControlFrame):
pass
logger.remove(0)
logger.add(sys.stderr, level="TRACE")
class HeartbeatsObserver(BaseObserver):
@@ -48,54 +49,58 @@ class HeartbeatsObserver(BaseObserver):
class QueuedFrameProcessor(FrameProcessor):
def __init__(self, queue: asyncio.Queue, ignore_start: bool = True):
def __init__(
self, queue: asyncio.Queue, queue_direction: FrameDirection, ignore_start: bool = True
):
super().__init__()
self._queue = queue
self._queue_direction = queue_direction
self._ignore_start = ignore_start
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if self._ignore_start and isinstance(frame, StartFrame):
await self.push_frame(frame, direction)
else:
await self._queue.put(frame)
await self.push_frame(frame, direction)
if direction == self._queue_direction:
if not isinstance(frame, StartFrame) or not self._ignore_start:
await self._queue.put(frame)
await self.push_frame(frame, direction)
async def run_test(
processor: FrameProcessor,
*,
frames_to_send: Sequence[Frame],
expected_down_frames: Sequence[type],
expected_up_frames: Sequence[type] = [],
ignore_start: bool = True,
start_metadata: Dict[str, Any] = {},
send_end_frame: bool = True,
) -> Tuple[Sequence[Frame], Sequence[Frame]]:
received_up = asyncio.Queue()
received_down = asyncio.Queue()
source = QueuedFrameProcessor(received_up)
sink = QueuedFrameProcessor(received_down)
source = QueuedFrameProcessor(received_up, FrameDirection.UPSTREAM, ignore_start)
sink = QueuedFrameProcessor(received_down, FrameDirection.DOWNSTREAM, ignore_start)
source.link(processor)
processor.link(sink)
pipeline = Pipeline([source, processor, sink])
task_manager = TaskManager()
task_manager.set_event_loop(asyncio.get_event_loop())
await source.queue_frame(StartFrame(clock=SystemClock(), task_manager=task_manager))
task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata))
for frame in frames_to_send:
await processor.process_frame(frame, FrameDirection.DOWNSTREAM)
await task.queue_frame(frame)
await processor.queue_frame(EndTestFrame())
await processor.queue_frame(EndTestFrame(), FrameDirection.UPSTREAM)
if send_end_frame:
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
#
# Down frames
#
received_down_frames: Sequence[Frame] = []
running = True
while running:
while not received_down.empty():
frame = await received_down.get()
running = not isinstance(frame, EndTestFrame)
if running:
if not isinstance(frame, EndFrame) or not send_end_frame:
received_down_frames.append(frame)
print("received DOWN frames =", received_down_frames)
@@ -109,12 +114,9 @@ async def run_test(
# Up frames
#
received_up_frames: Sequence[Frame] = []
running = True
while running:
while not received_up.empty():
frame = await received_up.get()
running = not isinstance(frame, EndTestFrame)
if running:
received_up_frames.append(frame)
received_up_frames.append(frame)
print("received UP frames =", received_up_frames)

View File

@@ -35,6 +35,9 @@ class BaseInputTransport(FrameProcessor):
self._params = params
# Input sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
# We read audio from a single queue one at a time and we then run VAD in
# a thread. Therefore, only one thread should be necessary.
self._executor = ThreadPoolExecutor(max_workers=1)
@@ -43,10 +46,23 @@ class BaseInputTransport(FrameProcessor):
# if passthrough is enabled.
self._audio_task = None
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
# Configure VAD analyzer.
if self._params.vad_enabled and self._params.vad_analyzer:
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._params.audio_in_sample_rate)
await self._params.audio_in_filter.start(self._sample_rate)
# Create audio input queue and task if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = asyncio.Queue()
@@ -67,9 +83,6 @@ class BaseInputTransport(FrameProcessor):
await self.cancel_task(self._audio_task)
self._audio_task = None
def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer
async def push_audio_frame(self, frame: InputAudioRawFrame):
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_queue.put(frame)
@@ -104,9 +117,8 @@ class BaseInputTransport(FrameProcessor):
await self.push_frame(frame, direction)
await self.stop(frame)
elif isinstance(frame, VADParamsUpdateFrame):
vad_analyzer = self.vad_analyzer()
if vad_analyzer:
vad_analyzer.set_params(frame.params)
if self.vad_analyzer:
self.vad_analyzer.set_params(frame.params)
elif isinstance(frame, FilterUpdateSettingsFrame) and self._params.audio_in_filter:
await self._params.audio_in_filter.process_frame(frame)
# Other frames
@@ -140,11 +152,10 @@ class BaseInputTransport(FrameProcessor):
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
state = VADState.QUIET
vad_analyzer = self.vad_analyzer()
if vad_analyzer:
if self.vad_analyzer:
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
state = await self.get_event_loop().run_in_executor(
self._executor, vad_analyzer.analyze_audio, audio_frame.audio
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
)
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
return state

View File

@@ -57,12 +57,11 @@ class BaseOutputTransport(FrameProcessor):
# framerate.
self._camera_images = None
# We will write 20ms audio at a time. If we receive long audio frames we
# will chunk them. This will help with interruption handling.
audio_bytes_10ms = (
int(self._params.audio_out_sample_rate / 100) * self._params.audio_out_channels * 2
)
self._audio_chunk_size = audio_bytes_10ms * 2
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
# Chunk size that will be written. It will be computed on StartFrame
self._audio_chunk_size = 0
self._audio_buffer = bytearray()
self._stopped_event = asyncio.Event()
@@ -70,10 +69,21 @@ class BaseOutputTransport(FrameProcessor):
# Indicates if the bot is currently speaking.
self._bot_speaking = False
@property
def sample_rate(self) -> int:
return self._sample_rate
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
# We will write 20ms audio at a time. If we receive long audio frames we
# will chunk them. This will help with interruption handling.
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * 2
# Start audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.start(self._params.audio_out_sample_rate)
await self._params.audio_out_mixer.start(self._sample_rate)
self._create_camera_task()
self._create_sink_tasks()
@@ -298,7 +308,7 @@ class BaseOutputTransport(FrameProcessor):
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._params.audio_out_mixer.mix(silence),
sample_rate=self._params.audio_out_sample_rate,
sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame

View File

@@ -31,12 +31,12 @@ class TransportParams(BaseModel):
camera_out_color_format: str = "RGB"
audio_out_enabled: bool = False
audio_out_is_live: bool = False
audio_out_sample_rate: int = 24000
audio_out_sample_rate: Optional[int] = None
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_out_mixer: Optional[BaseAudioMixer] = None
audio_in_enabled: bool = False
audio_in_sample_rate: int = 16000
audio_in_sample_rate: Optional[int] = None
audio_in_channels: int = 1
audio_in_filter: Optional[BaseAudioFilter] = None
vad_enabled: bool = False

View File

@@ -28,35 +28,40 @@ except ModuleNotFoundError as e:
class LocalAudioInputTransport(BaseInputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._py_audio = py_audio
self._in_stream = None
self._sample_rate = 0
sample_rate = self._params.audio_in_sample_rate
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_in_channels,
rate=params.audio_in_sample_rate,
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
num_frames = int(self._sample_rate / 100) * 2 # 20ms of audio
self._in_stream = self._py_audio.open(
format=self._py_audio.get_format_from_width(2),
channels=self._params.audio_in_channels,
rate=self._sample_rate,
frames_per_buffer=num_frames,
stream_callback=self._audio_in_callback,
input=True,
)
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
if self._in_stream:
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
self._in_stream = None
def _audio_in_callback(self, in_data, frame_count, time_info, status):
frame = InputAudioRawFrame(
audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
sample_rate=self._sample_rate,
num_channels=self._params.audio_in_channels,
)
@@ -68,32 +73,41 @@ class LocalAudioInputTransport(BaseInputTransport):
class LocalAudioOutputTransport(BaseOutputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._py_audio = py_audio
self._out_stream = None
self._sample_rate = 0
# We only write audio frames from a single task, so only one thread
# should be necessary.
self._executor = ThreadPoolExecutor(max_workers=1)
self._out_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_out_channels,
rate=params.audio_out_sample_rate,
output=True,
)
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
self._out_stream = self._py_audio.open(
format=self._py_audio.get_format_from_width(2),
channels=self._params.audio_out_channels,
rate=self._sample_rate,
output=True,
)
self._out_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
if self._out_stream:
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
async def write_raw_audio_frames(self, frames: bytes):
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
)
class LocalAudioTransport(BaseTransport):

View File

@@ -36,35 +36,39 @@ except ModuleNotFoundError as e:
class TkInputTransport(BaseInputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._py_audio = py_audio
self._in_stream = None
self._sample_rate = 0
sample_rate = self._params.audio_in_sample_rate
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_in_channels,
rate=params.audio_in_sample_rate,
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
num_frames = int(self._sample_rate / 100) * 2 # 20ms of audio
self._in_stream = self._py_audio.open(
format=self._py_audio.get_format_from_width(2),
channels=self._params.audio_in_channels,
rate=self._sample_rate,
frames_per_buffer=num_frames,
stream_callback=self._audio_in_callback,
input=True,
)
async def start(self, frame: StartFrame):
await super().start(frame)
self._in_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
if self._in_stream:
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
def _audio_in_callback(self, in_data, frame_count, time_info, status):
frame = InputAudioRawFrame(
audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
sample_rate=self._sample_rate,
num_channels=self._params.audio_in_channels,
)
@@ -76,18 +80,14 @@ class TkInputTransport(BaseInputTransport):
class TkOutputTransport(BaseOutputTransport):
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._py_audio = py_audio
self._out_stream = None
self._sample_rate = 0
# We only write audio frames from a single task, so only one thread
# should be necessary.
self._executor = ThreadPoolExecutor(max_workers=1)
self._out_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_out_channels,
rate=params.audio_out_sample_rate,
output=True,
)
# Start with a neutral gray background.
array = np.ones((1024, 1024, 3)) * 128
data = f"P5 {1024} {1024} 255 ".encode() + array.astype(np.uint8).tobytes()
@@ -97,18 +97,31 @@ class TkOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
self._out_stream = self._py_audio.open(
format=self._py_audio.get_format_from_width(2),
channels=self._params.audio_out_channels,
rate=self._sample_rate,
output=True,
)
self._out_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
if self._out_stream:
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
async def write_raw_audio_frames(self, frames: bytes):
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
)
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)

View File

@@ -69,6 +69,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
if self._params.session_timeout:
self._monitor_websocket_task = self.create_task(self._monitor_websocket())
await self._callbacks.on_client_connected(self._websocket)
@@ -91,7 +92,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
async def _receive_messages(self):
try:
async for message in self._iter_data():
frame = self._params.serializer.deserialize(message)
frame = await self._params.serializer.deserialize(message)
if not frame:
continue
@@ -101,7 +102,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
else:
await self.push_frame(frame)
except Exception as e:
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
await self._callbacks.on_client_disconnected(self._websocket)
@@ -118,9 +119,19 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
self._websocket = websocket
self._params = params
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
# write_raw_audio_frames() is called quickly, as soon as we get audio
# (e.g. from the TTS), and since this is just a network connection we
# would be sending it to quickly. Instead, we want to block to emulate
# an audio device, this is what the send interval is. It will be
# computed on StartFrame.
self._send_interval = 0
self._next_send_time = 0
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -136,7 +147,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self._params.audio_out_sample_rate,
sample_rate=self.sample_rate,
num_channels=self._params.audio_out_channels,
)
@@ -163,11 +174,11 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def _write_frame(self, frame: Frame):
try:
payload = self._params.serializer.serialize(frame)
payload = await self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._send_data(payload)
except Exception as e:
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
def _send_data(self, data: str | bytes):
if self._params.serializer.type == FrameSerializerType.BINARY:

View File

@@ -101,7 +101,7 @@ class WebsocketClientSession:
if self._websocket:
await self._websocket.send(message)
except Exception as e:
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
async def _client_task_handler(self):
try:
@@ -109,7 +109,7 @@ class WebsocketClientSession:
async for message in self._websocket:
await self._callbacks.on_message(self._websocket, message)
except Exception as e:
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
await self._callbacks.on_disconnected(self._websocket)
@@ -126,6 +126,7 @@ class WebsocketClientInputTransport(BaseInputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
@@ -138,7 +139,7 @@ class WebsocketClientInputTransport(BaseInputTransport):
await self._session.disconnect()
async def on_message(self, websocket, message):
frame = self._params.serializer.deserialize(message)
frame = await self._params.serializer.deserialize(message)
if not frame:
return
if isinstance(frame, InputAudioRawFrame) and self._params.audio_in_enabled:
@@ -154,11 +155,18 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
self._session = session
self._params = params
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
# write_raw_audio_frames() is called quickly, as soon as we get audio
# (e.g. from the TTS), and since this is just a network connection we
# would be sending it to quickly. Instead, we want to block to emulate
# an audio device, this is what the send interval is. It will be
# computed on StartFrame.
self._send_interval = 0
self._next_send_time = 0
async def start(self, frame: StartFrame):
await super().start(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
@@ -176,7 +184,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def write_raw_audio_frames(self, frames: bytes):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self._params.audio_out_sample_rate,
sample_rate=self.sample_rate,
num_channels=self._params.audio_out_channels,
)
@@ -200,7 +208,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
await self._write_audio_sleep()
async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)
payload = await self._params.serializer.serialize(frame)
if payload:
await self._session.send(payload)

View File

@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
@@ -39,7 +38,7 @@ except ModuleNotFoundError as e:
class WebsocketServerParams(TransportParams):
add_wav_header: bool = False
serializer: FrameSerializer = ProtobufFrameSerializer()
serializer: FrameSerializer
session_timeout: int | None = None
@@ -67,20 +66,32 @@ class WebsocketServerInputTransport(BaseInputTransport):
self._websocket: websockets.WebSocketServerProtocol | None = None
self._server_task = None
# This task will monitor the websocket connection periodically.
self._monitor_task = None
self._stop_server_event = asyncio.Event()
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
self._server_task = self.create_task(self._server_task_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
self._stop_server_event.set()
await self.wait_for_task(self._server_task)
if self._monitor_task:
await self.cancel_task(self._monitor_task)
if self._server_task:
await self.wait_for_task(self._server_task)
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self.cancel_task(self._server_task)
if self._monitor_task:
await self.cancel_task(self._monitor_task)
if self._server_task:
await self.cancel_task(self._server_task)
async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
@@ -100,12 +111,14 @@ class WebsocketServerInputTransport(BaseInputTransport):
# Create a task to monitor the websocket connection
if self._params.session_timeout:
self.create_task(self._monitor_websocket(websocket))
self._monitor_task = self.create_task(
self._monitor_websocket(websocket, self._params.session_timeout)
)
# Handle incoming messages
try:
async for message in websocket:
frame = self._params.serializer.deserialize(message)
frame = await self._params.serializer.deserialize(message)
if not frame:
continue
@@ -115,7 +128,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
else:
await self.push_frame(frame)
except Exception as e:
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
# Notify disconnection
await self._callbacks.on_client_disconnected(websocket)
@@ -125,10 +138,13 @@ class WebsocketServerInputTransport(BaseInputTransport):
logger.info(f"Client {websocket.remote_address} disconnected")
async def _monitor_websocket(self, websocket: websockets.WebSocketServerProtocol):
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
async def _monitor_websocket(
self, websocket: websockets.WebSocketServerProtocol, session_timeout: int
):
"""Wait for session_timeout seconds, if the websocket is still open,
trigger timeout event."""
try:
await asyncio.sleep(self._params.session_timeout)
await asyncio.sleep(session_timeout)
if not websocket.closed:
await self._callbacks.on_session_timeout(websocket)
except asyncio.CancelledError:
@@ -144,7 +160,12 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
self._websocket: websockets.WebSocketServerProtocol | None = None
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
# write_raw_audio_frames() is called quickly, as soon as we get audio
# (e.g. from the TTS), and since this is just a network connection we
# would be sending it to quickly. Instead, we want to block to emulate
# an audio device, this is what the send interval is. It will be
# computed on StartFrame.
self._send_interval = 0
self._next_send_time = 0
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol | None):
@@ -153,6 +174,11 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
logger.warning("Only one client allowed, using new connection")
self._websocket = websocket
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -168,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self._params.audio_out_sample_rate,
sample_rate=self.sample_rate,
num_channels=self._params.audio_out_channels,
)
@@ -193,11 +219,11 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def _write_frame(self, frame: Frame):
try:
payload = self._params.serializer.serialize(frame)
payload = await self._params.serializer.serialize(frame)
if payload and self._websocket:
await self._websocket.send(payload)
except Exception as e:
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
async def _write_audio_sleep(self):
# Simulate a clock.
@@ -213,14 +239,13 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
class WebsocketServerTransport(BaseTransport):
def __init__(
self,
params: WebsocketServerParams,
host: str = "localhost",
port: int = 8765,
params: WebsocketServerParams = WebsocketServerParams(),
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
):
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
super().__init__(input_name=input_name, output_name=output_name)
self._host = host
self._port = port
self._params = params

View File

@@ -71,11 +71,11 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
class WebRTCVADAnalyzer(VADAnalyzer):
def __init__(self, *, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=num_channels, params=params)
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, params=params)
self._webrtc_vad = Daily.create_native_vad(
reset_period_ms=VAD_RESET_PERIOD_MS, sample_rate=sample_rate, channels=num_channels
reset_period_ms=VAD_RESET_PERIOD_MS, sample_rate=self.sample_rate, channels=1
)
logger.debug("Loaded native WebRTC VAD")
@@ -222,33 +222,13 @@ class DailyTransportClient(EventHandler):
self._callback_queue = asyncio.Queue()
self._callback_task = None
# Input and ouput sample rates. They will be initialize on setup().
self._in_sample_rate = 0
self._out_sample_rate = 0
self._camera: VirtualCameraDevice | None = None
if self._params.camera_out_enabled:
self._camera = Daily.create_camera_device(
self._camera_name(),
width=self._params.camera_out_width,
height=self._params.camera_out_height,
color_format=self._params.camera_out_color_format,
)
self._mic: VirtualMicrophoneDevice | None = None
if self._params.audio_out_enabled:
self._mic = Daily.create_microphone_device(
self._mic_name(),
sample_rate=self._params.audio_out_sample_rate,
channels=self._params.audio_out_channels,
non_blocking=True,
)
self._speaker: VirtualSpeakerDevice | None = None
if self._params.audio_in_enabled or self._params.vad_enabled:
self._speaker = Daily.create_speaker_device(
self._speaker_name(),
sample_rate=self._params.audio_in_sample_rate,
channels=self._params.audio_in_channels,
non_blocking=True,
)
Daily.select_speaker_device(self._speaker_name())
def _camera_name(self):
return f"camera-{self}"
@@ -281,7 +261,7 @@ class DailyTransportClient(EventHandler):
if not self._speaker:
return None
sample_rate = self._params.audio_in_sample_rate
sample_rate = self._in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
@@ -315,6 +295,34 @@ class DailyTransportClient(EventHandler):
self._camera.write_frame(frame.image)
async def setup(self, frame: StartFrame):
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
if self._params.camera_out_enabled and not self._camera:
self._camera = Daily.create_camera_device(
self._camera_name(),
width=self._params.camera_out_width,
height=self._params.camera_out_height,
color_format=self._params.camera_out_color_format,
)
if self._params.audio_out_enabled and not self._mic:
self._mic = Daily.create_microphone_device(
self._mic_name(),
sample_rate=self._out_sample_rate,
channels=self._params.audio_out_channels,
non_blocking=True,
)
if (self._params.audio_in_enabled or self._params.vad_enabled) and not self._speaker:
self._speaker = Daily.create_speaker_device(
self._speaker_name(),
sample_rate=self._in_sample_rate,
channels=self._params.audio_in_channels,
non_blocking=True,
)
Daily.select_speaker_device(self._speaker_name())
if not self._task_manager:
self._task_manager = frame.task_manager
self._callback_task = self._task_manager.create_task(
@@ -707,6 +715,7 @@ class DailyInputTransport(BaseInputTransport):
super().__init__(params, **kwargs)
self._client = client
self._params = params
self._video_renderers = {}
@@ -715,11 +724,10 @@ class DailyInputTransport(BaseInputTransport):
self._audio_in_task = None
self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
if params.vad_enabled and not params.vad_analyzer:
self._vad_analyzer = WebRTCVADAnalyzer(
sample_rate=self._params.audio_in_sample_rate,
num_channels=self._params.audio_in_channels,
)
@property
def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
async def start(self, frame: StartFrame):
# Parent start.
@@ -728,6 +736,9 @@ class DailyInputTransport(BaseInputTransport):
await self._client.setup(frame)
# Join the room.
await self._client.join()
# Inialize WebRTC VAD if needed.
if self._params.vad_enabled and not self._params.vad_analyzer:
self._vad_analyzer = WebRTCVADAnalyzer(sample_rate=self.sample_rate)
# Create audio task. It reads audio frames from Daily and push them
# internally for VAD processing.
if self._params.audio_in_enabled or self._params.vad_enabled:
@@ -757,9 +768,6 @@ class DailyInputTransport(BaseInputTransport):
await super().cleanup()
await self._client.cleanup()
def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
#
# FrameProcessor
#

View File

@@ -11,7 +11,7 @@ from typing import Any, Awaitable, Callable, List, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.frames.frames import (
AudioRawFrame,
@@ -101,6 +101,7 @@ class LiveKitTransportClient:
return self._room
async def setup(self, frame: StartFrame):
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
if not self._task_manager:
self._task_manager = frame.task_manager
self._room = rtc.Room(loop=self._task_manager.get_event_loop())
@@ -138,7 +139,7 @@ class LiveKitTransportClient:
# Set up audio source and track
self._audio_source = rtc.AudioSource(
self._params.audio_out_sample_rate, self._params.audio_out_channels
self._out_sample_rate, self._params.audio_out_channels
)
self._audio_track = rtc.LocalAudioTrack.create_audio_track(
"pipecat-audio", self._audio_source
@@ -349,6 +350,11 @@ class LiveKitInputTransport(BaseInputTransport):
self._client = client
self._audio_in_task = None
self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
self._resampler = create_default_resampler()
@property
def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
async def start(self, frame: StartFrame):
await super().start(frame)
@@ -371,9 +377,6 @@ class LiveKitInputTransport(BaseInputTransport):
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
await self.cancel_task(self._audio_in_task)
def vad_analyzer(self) -> VADAnalyzer | None:
return self._vad_analyzer
async def push_app_message(self, message: Any, sender: str):
frame = LiveKitTransportMessageUrgentFrame(message=message, participant_id=sender)
await self.push_frame(frame)
@@ -384,7 +387,9 @@ class LiveKitInputTransport(BaseInputTransport):
audio_data = await self._client.get_next_audio_frame()
if audio_data:
audio_frame_event, participant_id = audio_data
pipecat_audio_frame = self._convert_livekit_audio_to_pipecat(audio_frame_event)
pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat(
audio_frame_event
)
input_audio_frame = InputAudioRawFrame(
audio=pipecat_audio_frame.audio,
sample_rate=pipecat_audio_frame.sample_rate,
@@ -392,18 +397,18 @@ class LiveKitInputTransport(BaseInputTransport):
)
await self.push_audio_frame(input_audio_frame)
def _convert_livekit_audio_to_pipecat(
async def _convert_livekit_audio_to_pipecat(
self, audio_frame_event: rtc.AudioFrameEvent
) -> AudioRawFrame:
audio_frame = audio_frame_event.frame
audio_data = resample_audio(
audio_frame.data.tobytes(), audio_frame.sample_rate, self._params.audio_in_sample_rate
audio_data = await self._resampler.resample(
audio_frame.data.tobytes(), audio_frame.sample_rate, self.sample_rate
)
return AudioRawFrame(
audio=audio_data,
sample_rate=self._params.audio_in_sample_rate,
sample_rate=self.sample_rate,
num_channels=audio_frame.num_channels,
)
@@ -445,7 +450,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
return rtc.AudioFrame(
data=pipecat_audio,
sample_rate=self._params.audio_out_sample_rate,
sample_rate=self.sample_rate,
num_channels=self._params.audio_out_channels,
samples_per_channel=samples_per_channel,
)

View File

@@ -31,7 +31,11 @@ class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
expected_returned_frames = [TextFrame, TextFrame, TextFrame]
(received_down, _) = await run_test(aggregator, frames_to_send, expected_returned_frames)
(received_down, _) = await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
assert received_down[-3].text == "Hello, world. "
assert received_down[-2].text == "How are you? "
assert received_down[-1].text == "I am fine! "
@@ -66,5 +70,7 @@ class TestGatedAggregator(unittest.IsolatedAsyncioTestCase):
]
(received_down, _) = await run_test(
gated_aggregator, frames_to_send, expected_returned_frames
gated_aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
@@ -19,7 +18,7 @@ from pipecat.processors.filters.frame_filter import FrameFilter
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.filters.identity_filter import IdentityFilter
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
from pipecat.tests.utils import EndTestFrame, run_test
from pipecat.tests.utils import run_test
class TestIdentifyFilter(unittest.IsolatedAsyncioTestCase):
@@ -27,27 +26,44 @@ class TestIdentifyFilter(unittest.IsolatedAsyncioTestCase):
filter = IdentityFilter()
frames_to_send = [UserStartedSpeakingFrame(), UserStoppedSpeakingFrame()]
expected_returned_frames = [UserStartedSpeakingFrame, UserStoppedSpeakingFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
async def test_text_frame(self):
filter = FrameFilter(types=(TextFrame, EndTestFrame))
filter = FrameFilter(types=(TextFrame,))
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_end_frame(self):
filter = FrameFilter(types=(EndFrame, EndTestFrame))
filter = FrameFilter(types=(EndFrame,))
frames_to_send = [EndFrame()]
expected_returned_frames = [EndFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
send_end_frame=False,
)
async def test_system_frame(self):
filter = FrameFilter(types=(EndTestFrame,))
filter = FrameFilter(types=())
frames_to_send = [UserStartedSpeakingFrame()]
expected_returned_frames = [UserStartedSpeakingFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
@@ -58,7 +74,11 @@ class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
filter = FunctionFilter(filter=passthrough)
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_no_passthrough(self):
async def no_passthrough(frame: Frame):
@@ -66,14 +86,12 @@ class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
filter = FunctionFilter(filter=no_passthrough)
frames_to_send = [TextFrame(text="Hello Pipecat!")]
expected_returned_frames = [TextFrame]
try:
await asyncio.wait_for(
run_test(filter, frames_to_send, expected_returned_frames), timeout=0.5
)
assert False
except asyncio.TimeoutError:
pass
expected_returned_frames = []
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
@@ -81,7 +99,11 @@ class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
frames_to_send = [TranscriptionFrame(user_id="test", text="Phrase 1", timestamp="")]
expected_returned_frames = []
await run_test(filter, frames_to_send, expected_returned_frames)
await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_wake_word(self):
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
@@ -90,5 +112,9 @@ class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
TranscriptionFrame(user_id="test", text="Phrase 1", timestamp=""),
]
expected_returned_frames = [TranscriptionFrame, TranscriptionFrame]
(received_down, _) = await run_test(filter, frames_to_send, expected_returned_frames)
(received_down, _) = await run_test(
filter,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
assert received_down[-1].text == "Phrase 1"

View File

@@ -7,7 +7,7 @@
import asyncio
import unittest
from pipecat.frames.frames import EndFrame, HeartbeatFrame, TextFrame
from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, TextFrame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -22,7 +22,11 @@ class TestPipeline(unittest.IsolatedAsyncioTestCase):
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_pipeline_multiple(self):
identity1 = IdentityFilter()
@@ -33,7 +37,25 @@ class TestPipeline(unittest.IsolatedAsyncioTestCase):
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_pipeline_start_metadata(self):
pipeline = Pipeline([IdentityFilter()])
frames_to_send = []
expected_returned_frames = [StartFrame]
(received_down, _) = await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
ignore_start=False,
start_metadata={"foo": "bar"},
)
assert "foo" in received_down[-1].metadata
class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
@@ -42,7 +64,11 @@ class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
async def test_parallel_multiple(self):
"""Should only passthrough one instance of TextFrame."""
@@ -50,7 +76,11 @@ class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
expected_returned_frames = [TextFrame]
await run_test(pipeline, frames_to_send, expected_returned_frames)
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_returned_frames,
)
class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
@@ -79,7 +109,9 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_heartbeats=True, heartbeats_period_secs=0.2, observers=[heartbeats_observer]
enable_heartbeats=True,
heartbeats_period_secs=0.2,
observers=[heartbeats_observer],
),
)
task.set_event_loop(asyncio.get_event_loop())

View File

@@ -20,17 +20,19 @@ class TestProtobufFrameSerializer(unittest.IsolatedAsyncioTestCase):
async def test_roundtrip(self):
text_frame = TextFrame(text="hello world")
frame = self.serializer.deserialize(self.serializer.serialize(text_frame))
frame = await self.serializer.deserialize(await self.serializer.serialize(text_frame))
self.assertEqual(text_frame, frame)
transcription_frame = TranscriptionFrame(
text="Hello there!", user_id="123", timestamp="2021-01-01"
)
frame = self.serializer.deserialize(self.serializer.serialize(transcription_frame))
frame = await self.serializer.deserialize(
await self.serializer.serialize(transcription_frame)
)
self.assertEqual(frame, transcription_frame)
audio_frame = OutputAudioRawFrame(audio=b"1234567890", sample_rate=16000, num_channels=1)
frame = self.serializer.deserialize(self.serializer.serialize(audio_frame))
frame = await self.serializer.deserialize(await self.serializer.serialize(audio_frame))
self.assertEqual(frame.audio, audio_frame.audio)
self.assertEqual(frame.sample_rate, audio_frame.sample_rate)
self.assertEqual(frame.num_channels, audio_frame.num_channels)