Compare commits
34 Commits
cb/extra-l
...
v0.0.55
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99d3227ff5 | ||
|
|
7730f59635 | ||
|
|
ba31546c32 | ||
|
|
a363d12d1f | ||
|
|
feab9c8fa2 | ||
|
|
61f6669926 | ||
|
|
3be69908d2 | ||
|
|
fcb80ec330 | ||
|
|
c9f5684e2f | ||
|
|
c257fa1573 | ||
|
|
97c55da29f | ||
|
|
49426aa9a1 | ||
|
|
0a333c26da | ||
|
|
75a29424ff | ||
|
|
cd1b429308 | ||
|
|
7f1ae4b8cc | ||
|
|
af9fd811cd | ||
|
|
69f5c9b9d3 | ||
|
|
ab45e481be | ||
|
|
cc54255c41 | ||
|
|
1cdb66f889 | ||
|
|
51a86a509c | ||
|
|
824898f7b7 | ||
|
|
57dadb6359 | ||
|
|
5dcdc68ef5 | ||
|
|
aafb2db620 | ||
|
|
f3f22cf61c | ||
|
|
371c2f3704 | ||
|
|
1f14f62696 | ||
|
|
06449eff2c | ||
|
|
dcfb86583d | ||
|
|
cda34a1320 | ||
|
|
13611fd8e1 | ||
|
|
fc89aad469 |
52
CHANGELOG.md
52
CHANGELOG.md
@@ -5,10 +5,25 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.55] - 2025-02-05
|
||||
|
||||
### Added
|
||||
|
||||
- Added a new `start_metadata` field to `PipelineParams`. The provided metadata
|
||||
will be set to the initial `StartFrame` being pushed from the `PipelineTask`.
|
||||
|
||||
- Added new fields to `PipelineParams` to control audio input and output sample
|
||||
rates for the whole pipeline. This allows controlling sample rates from a
|
||||
single place instead of having to specify sample rates in each
|
||||
service. Setting a sample rate to a service is still possible and will
|
||||
override the value from `PipelineParams`.
|
||||
|
||||
- Introduce audio resamplers (`BaseAudioResampler`). This is just a base class
|
||||
to implement audio resamplers. Currently, two implementations are provided
|
||||
`SOXRAudioResampler` and `ResampyResampler`. A new
|
||||
`create_default_resampler()` has been added (replacing the now deprecated
|
||||
`resample_audio()`).
|
||||
|
||||
- It is now possible to specify the asyncio event loop that a `PipelineTask` and
|
||||
all the processors should run on by passing it as a new argument to the
|
||||
`PipelineRunner`. This could allow running pipelines in multiple threads each
|
||||
@@ -41,6 +56,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `GatedOpenAILLMContextAggregator` now require keyword arguments. Also, a new
|
||||
`start_open` argument has been added to set the initial state of the gate.
|
||||
|
||||
- Added `organization` and `project` level authentication to
|
||||
`OpenAILLMService`.
|
||||
|
||||
@@ -56,8 +74,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
|
||||
`OutputDTMFFrame` frame.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `resample_audio()` is now deprecated, use `create_default_resampler()`
|
||||
instead.
|
||||
|
||||
### Removed
|
||||
|
||||
- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
|
||||
`AudioBufferProcessor.start_recording()` and
|
||||
``AudioBufferProcessor.stop_recording()` instead.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
|
||||
|
||||
- Fixed an issue in `AudioBufferProcessor` where user callback would not be
|
||||
called on task cancellation.
|
||||
|
||||
- Fixed an issue in `AudioBufferProcessor` that would cause wrong silence
|
||||
padding in some cases.
|
||||
|
||||
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
|
||||
websocket error by increasing the max message size limit to 16MB.
|
||||
|
||||
@@ -73,6 +110,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Other
|
||||
|
||||
- Improved Unit Test `run_test()` to use `PipelineTask` and
|
||||
`PipelineRunner`. There's now also some control around `StartFrame` and
|
||||
`EndFrame`. The `EndTaskFrame` has been removed since it doesn't seem
|
||||
necessary with this new approach.
|
||||
|
||||
- Updated `twilio-chatbot` with a few new features: use 8000 sample rate and
|
||||
avoid resampling, a new client useful for stress testing and testing locally
|
||||
without the need to make phone calls. Also, added audio recording on both the
|
||||
client and the server to make sure the audio sounds good.
|
||||
|
||||
- Updated examples to use `task.cancel()` to immediately exit the example when a
|
||||
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
|
||||
an `EndFrame` causes the bot to run through everything that is internally
|
||||
@@ -1527,6 +1574,9 @@ async def on_connected(processor):
|
||||
|
||||
### Changed
|
||||
|
||||
- `FrameSerializer.serialize()` and `FrameSerializer.deserialize()` are now
|
||||
`async`.
|
||||
|
||||
- `Filter` has been renamed to `FrameFilter` and it's now under
|
||||
`processors/filters`.
|
||||
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
build~=1.2.2
|
||||
grpcio-tools~=1.69.0
|
||||
grpcio-tools~=1.67.1
|
||||
pip-tools~=7.4.1
|
||||
pre-commit~=4.0.1
|
||||
pyright~=1.1.392
|
||||
pytest~=8.3.4
|
||||
pytest-asyncio~=0.25.2
|
||||
ruff~=0.9.1
|
||||
setuptools~=75.8.0
|
||||
setuptools~=70.0.0
|
||||
setuptools_scm~=8.1.0
|
||||
python-dotenv~=1.0.1
|
||||
|
||||
@@ -17,7 +17,7 @@ from runner import configure
|
||||
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
@@ -31,16 +31,15 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
class SilenceFrame(OutputAudioRawFrame):
|
||||
def __init__(
|
||||
self,
|
||||
audio: bytes = None,
|
||||
sample_rate: int = 16000,
|
||||
num_channels: int = 1,
|
||||
duration: float = 0.1,
|
||||
*,
|
||||
sample_rate: int,
|
||||
duration: float,
|
||||
):
|
||||
# Initialize the parent class with the silent frame's data
|
||||
super().__init__(
|
||||
audio=self.create_silent_audio_frame(sample_rate, num_channels, duration).audio,
|
||||
audio=self.create_silent_audio_frame(sample_rate, 1, duration).audio,
|
||||
sample_rate=sample_rate,
|
||||
num_channels=num_channels,
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -80,7 +79,10 @@ async def main():
|
||||
return
|
||||
await task.queue_frames(
|
||||
[
|
||||
SilenceFrame(duration=0.5),
|
||||
SilenceFrame(
|
||||
sample_rate=task.params.audio_out_sample_rate,
|
||||
duration=0.5,
|
||||
),
|
||||
TTSSpeakFrame(f"Hello there, how are you doing today ?"),
|
||||
EndFrame(),
|
||||
]
|
||||
|
||||
@@ -124,6 +124,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await audio_buffer_processor.start_recording()
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -109,8 +109,9 @@ async def main():
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Save audio every 10 seconds.
|
||||
audiobuffer = AudioBufferProcessor(buffer_size=480000)
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You
|
||||
# can pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
@@ -132,6 +133,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await audiobuffer.start_recording()
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -38,7 +38,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -40,7 +40,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
@@ -61,7 +61,6 @@ async def main():
|
||||
"Test",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
@@ -78,7 +77,9 @@ async def main():
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.local.tk import TkLocalTransport
|
||||
@@ -62,7 +62,7 @@ async def main():
|
||||
tk_root.title("Local Mirror")
|
||||
|
||||
daily_transport = DailyTransport(
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True)
|
||||
)
|
||||
|
||||
tk_transport = TkLocalTransport(
|
||||
@@ -82,7 +82,9 @@ async def main():
|
||||
|
||||
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
async def run_tk():
|
||||
while not task.has_finished():
|
||||
|
||||
@@ -51,8 +51,6 @@ async def main():
|
||||
out_params=GStreamerPipelineSource.OutputParams(
|
||||
video_width=1280,
|
||||
video_height=720,
|
||||
audio_sample_rate=24000,
|
||||
audio_channels=1,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -80,9 +80,7 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
|
||||
@@ -177,9 +177,7 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
|
||||
@@ -88,6 +88,10 @@ async def main():
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
# We just use 16000 because that's what Tavus is expecting and
|
||||
# we avoid resampling.
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -104,8 +104,11 @@ async def main():
|
||||
)
|
||||
|
||||
# This processor keeps the last context and will let it through once the
|
||||
# notifier is woken up.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
|
||||
# notifier is woken up. We start with the gate open because we send an
|
||||
# initial context frame to start the conversation.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(
|
||||
notifier=notifier, start_open=True
|
||||
)
|
||||
|
||||
# Notify if the user hasn't said anything.
|
||||
async def user_idle_notifier(frame):
|
||||
|
||||
@@ -129,9 +129,9 @@ class CompletenessCheck(FrameProcessor):
|
||||
|
||||
|
||||
class OutputGate(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._gate_open = False
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
|
||||
@@ -252,7 +252,9 @@ async def main():
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
|
||||
|
||||
bot_output_gate = OutputGate(notifier=notifier)
|
||||
# We start with the gate open because we send an initial context frame
|
||||
# to start the conversation.
|
||||
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
|
||||
|
||||
async def block_user_stopped_speaking(frame):
|
||||
return not isinstance(frame, UserStoppedSpeakingFrame)
|
||||
|
||||
@@ -333,9 +333,9 @@ class CompletenessCheck(FrameProcessor):
|
||||
|
||||
|
||||
class OutputGate(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._gate_open = False
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
|
||||
@@ -461,7 +461,9 @@ async def main():
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
|
||||
|
||||
bot_output_gate = OutputGate(notifier=notifier)
|
||||
# We start with the gate open because we send an initial context frame
|
||||
# to start the conversation.
|
||||
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
|
||||
|
||||
async def block_user_stopped_speaking(frame):
|
||||
return not isinstance(frame, UserStoppedSpeakingFrame)
|
||||
|
||||
@@ -639,7 +639,6 @@ async def main():
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
audio_in_sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -84,8 +84,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
@@ -47,8 +45,6 @@ async def main():
|
||||
# matter because we can only use the Multimodal Live API's phrase
|
||||
# endpointing, for now.
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
|
||||
start_audio_paused=True,
|
||||
start_video_paused=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -52,8 +52,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -38,8 +38,6 @@ load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
DESIRED_SAMPLE_RATE = 16000
|
||||
|
||||
|
||||
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
|
||||
token = api.AccessToken(api_key, api_secret)
|
||||
@@ -114,11 +112,8 @@ async def main():
|
||||
token=token,
|
||||
room_name=room_name,
|
||||
params=LiveKitParams(
|
||||
audio_in_channels=1,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
audio_in_sample_rate=DESIRED_SAMPLE_RATE,
|
||||
audio_out_sample_rate=DESIRED_SAMPLE_RATE,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
@@ -128,7 +123,6 @@ async def main():
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
live_options=LiveOptions(
|
||||
sample_rate=DESIRED_SAMPLE_RATE,
|
||||
vad_events=True,
|
||||
),
|
||||
)
|
||||
@@ -138,7 +132,6 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
sample_rate=DESIRED_SAMPLE_RATE,
|
||||
)
|
||||
|
||||
messages = [
|
||||
|
||||
@@ -106,8 +106,8 @@ class UserImageRequester(FrameProcessor):
|
||||
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
|
||||
)
|
||||
await self.push_frame(TextFrame("Describe the image in a short sentence."))
|
||||
elif isinstance(frame, UserImageRawFrame):
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class TextFilterProcessor(FrameProcessor):
|
||||
|
||||
@@ -121,8 +121,6 @@ async def main():
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
beautifulsoup4==4.12.3
|
||||
pypdf==4.3.1
|
||||
tiktoken==0.7.0
|
||||
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
|
||||
pipecat-ai[daily,cartesia,openai,silero]
|
||||
python-dotenv==1.0.1
|
||||
|
||||
@@ -112,7 +112,6 @@ async def main():
|
||||
token,
|
||||
"studypal",
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
@@ -124,7 +123,6 @@ async def main():
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
|
||||
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
|
||||
sample_rate=44100,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
|
||||
@@ -155,7 +153,12 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -11,14 +11,13 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.serializers.telnyx import TelnyxFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService, Language
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
@@ -31,7 +30,12 @@ logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encoding):
|
||||
async def run_bot(
|
||||
websocket_client,
|
||||
stream_id: str,
|
||||
outbound_encoding: str,
|
||||
inbound_encoding: str,
|
||||
):
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
@@ -48,11 +52,9 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="CwhRBWXzGAHq8TQ4Fs17",
|
||||
output_format="pcm_24000",
|
||||
params=ElevenLabsTTSService.InputParams(language=Language.EN),
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -77,7 +79,14 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000,
|
||||
audio_out_sample_rate=8000,
|
||||
allow_interruptions=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
@@ -87,7 +96,7 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
await task.queue_frames([EndFrame()])
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
|
||||
@@ -107,3 +107,34 @@ The server will start on port 8765. Keep this running while you test with Twilio
|
||||
## Usage
|
||||
|
||||
To start a call, simply make a call to your configured Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.
|
||||
|
||||
## Testing
|
||||
|
||||
It is also possible to automatically test the server without making phone calls by using a software client.
|
||||
|
||||
First, update `templates/streams.xml` to point to your server's websocket endpoint. For example:
|
||||
|
||||
```
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="ws://localhost:8765/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>
|
||||
```
|
||||
|
||||
Then, start the server with `-t` to indicate we are testing:
|
||||
|
||||
```sh
|
||||
# Make sure you’re in the project directory and your virtual environment is activated
|
||||
python server.py -t
|
||||
```
|
||||
|
||||
Finally, just point the client to the server's URL:
|
||||
|
||||
```sh
|
||||
python client.py -u http://localhost:8765 -c 2
|
||||
```
|
||||
|
||||
where `-c` allows you to create multiple concurrent clients.
|
||||
|
||||
@@ -4,10 +4,15 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
|
||||
import aiofiles
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import WebSocket
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -15,6 +20,7 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
@@ -30,10 +36,29 @@ logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def run_bot(websocket_client, stream_sid):
|
||||
async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = (
|
||||
f"{server_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
|
||||
)
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Merged audio saved to {filename}")
|
||||
else:
|
||||
logger.info("No audio data to save")
|
||||
|
||||
|
||||
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
vad_enabled=True,
|
||||
@@ -45,23 +70,28 @@ async def run_bot(websocket_client, stream_sid):
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
push_silence_after_stop=testing,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
"content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You can
|
||||
# pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Websocket input from client
|
||||
@@ -70,14 +100,22 @@ async def run_bot(websocket_client, stream_sid):
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to client
|
||||
audiobuffer, # Used to buffer the audio in the pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
# Start recording.
|
||||
await audiobuffer.start_recording()
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
@@ -86,6 +124,15 @@ async def run_bot(websocket_client, stream_sid):
|
||||
async def on_client_disconnected(transport, client):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
server_name = f"server_{websocket_client.client.port}"
|
||||
await save_audio(server_name, audio, sample_rate, num_channels)
|
||||
|
||||
# We use `handle_sigint=False` because `uvicorn` is controlling keyboard
|
||||
# interruptions. We use `force_gc=True` to force garbage collection after
|
||||
# the runner finishes running a task which could be useful for long running
|
||||
# applications with multiple clients connecting.
|
||||
runner = PipelineRunner(handle_sigint=False, force_gc=True)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
199
examples/twilio-chatbot/client.py
Normal file
199
examples/twilio-chatbot/client.py
Normal file
@@ -0,0 +1,199 @@
|
||||
#
|
||||
# Copyright (c) 2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
import xml.etree.ElementTree as ET
|
||||
from uuid import uuid4
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import EndFrame, TransportMessageUrgentFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.network.websocket_client import (
|
||||
WebsocketClientParams,
|
||||
WebsocketClientTransport,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
DEFAULT_CLIENT_DURATION = 30
|
||||
|
||||
|
||||
async def download_twiml(server_url: str) -> str:
|
||||
# TODO(aleix): add error checking.
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(server_url) as response:
|
||||
return await response.text()
|
||||
|
||||
|
||||
def get_stream_url_from_twiml(twiml: str) -> str:
|
||||
root = ET.fromstring(twiml)
|
||||
# TODO(aleix): add error checking.
|
||||
stream_element = root.find(".//Stream") # Finds the first <Stream> element
|
||||
url = stream_element.get("url")
|
||||
return url
|
||||
|
||||
|
||||
async def save_audio(client_name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = (
|
||||
f"{client_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
|
||||
)
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Merged audio saved to {filename}")
|
||||
else:
|
||||
logger.info("No audio data to save")
|
||||
|
||||
|
||||
async def run_client(client_name: str, server_url: str, duration_secs: int):
|
||||
twiml = await download_twiml(server_url)
|
||||
|
||||
stream_url = get_stream_url_from_twiml(twiml)
|
||||
|
||||
stream_sid = str(uuid4())
|
||||
|
||||
transport = WebsocketClientTransport(
|
||||
uri=stream_url,
|
||||
params=WebsocketClientParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
serializer=TwilioFrameSerializer(stream_sid),
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
# We let the audio passthrough so we can record the conversation.
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
audio_passthrough=True,
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="e13cae5c-ec59-4f71-b0a6-266df3c9bb8e", # Madame Mischief
|
||||
push_silence_after_stop=True,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are an 8 year old child. A teacher will explain you new concepts you want to know about. Feel free to change topics whnever you want. Once you are taught something you need to keep asking for clarifications if you think someone your age would not understand what you are being taught.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You can
|
||||
# pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Websocket input from server
|
||||
stt, # Speech-To-Text
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to server
|
||||
audiobuffer, # Used to buffer the audio in the pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_connected")
|
||||
async def on_connected(transport: WebsocketClientTransport, client):
|
||||
# Start recording.
|
||||
await audiobuffer.start_recording()
|
||||
|
||||
message = TransportMessageUrgentFrame(
|
||||
message={"event": "connected", "protocol": "Call", "version": "1.0.0"}
|
||||
)
|
||||
await transport.output().send_message(message)
|
||||
|
||||
message = TransportMessageUrgentFrame(
|
||||
message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}}
|
||||
)
|
||||
await transport.output().send_message(message)
|
||||
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
await save_audio(client_name, audio, sample_rate, num_channels)
|
||||
|
||||
async def end_call():
|
||||
await asyncio.sleep(duration_secs)
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await asyncio.gather(runner.run(task), end_call())
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Client")
|
||||
parser.add_argument("-u", "--url", type=str, required=True, help="specify the server URL")
|
||||
parser.add_argument(
|
||||
"-c", "--clients", type=int, required=True, help="number of concurrent clients"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--duration",
|
||||
type=int,
|
||||
default=DEFAULT_CLIENT_DURATION,
|
||||
help=f"duration of each client in seconds (default: {DEFAULT_CLIENT_DURATION})",
|
||||
)
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
clients = []
|
||||
for i in range(args.clients):
|
||||
clients.append(asyncio.create_task(run_client(f"client_{i}", args.url, args.duration)))
|
||||
await asyncio.gather(*clients)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -4,6 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
import uvicorn
|
||||
@@ -38,8 +39,16 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
print(call_data, flush=True)
|
||||
stream_sid = call_data["start"]["streamSid"]
|
||||
print("WebSocket connection accepted")
|
||||
await run_bot(websocket, stream_sid)
|
||||
await run_bot(websocket, stream_sid, app.state.testing)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Server")
|
||||
parser.add_argument(
|
||||
"-t", "--test", action="store_true", default=False, help="set the server in testing mode"
|
||||
)
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
app.state.testing = args.test
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=8765)
|
||||
|
||||
@@ -17,6 +17,7 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.serializers.protobuf import ProtobufFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
@@ -80,7 +81,7 @@ class SessionTimeoutHandler:
|
||||
async def main():
|
||||
transport = WebsocketServerTransport(
|
||||
params=WebsocketServerParams(
|
||||
audio_out_sample_rate=16000,
|
||||
serializer=ProtobufFrameSerializer(),
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=True,
|
||||
vad_enabled=True,
|
||||
@@ -97,7 +98,6 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
sample_rate=16000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -122,7 +122,12 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000, audio_out_sample_rate=16000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
|
||||
@@ -32,6 +32,7 @@ dependencies = [
|
||||
"protobuf~=5.29.3",
|
||||
"pydantic~=2.10.5",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0"
|
||||
]
|
||||
|
||||
@@ -40,7 +41,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
|
||||
Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
anthropic = [ "anthropic~=0.39.0" ]
|
||||
anthropic = [ "anthropic~=0.45.2" ]
|
||||
assemblyai = [ "assemblyai~=0.36.0" ]
|
||||
aws = [ "boto3~=1.35.99" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
|
||||
@@ -69,7 +70,7 @@ moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
|
||||
nim = [ "openai~=1.59.6" ]
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
|
||||
openpipe = [ "openpipe~=4.43.0" ]
|
||||
openpipe = [ "openpipe~=4.45.0" ]
|
||||
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
|
||||
riva = [ "nvidia-riva-client~=2.18.0" ]
|
||||
silero = [ "onnxruntime~=1.20.1" ]
|
||||
|
||||
@@ -20,6 +20,22 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class KrispProcessorManager:
|
||||
"""
|
||||
Ensures that only one KrispAudioProcessor instance exists for the entire program.
|
||||
"""
|
||||
|
||||
_krisp_instance = None
|
||||
|
||||
@classmethod
|
||||
def get_processor(cls, sample_rate: int, sample_type: str, channels: int, model_path: str):
|
||||
if cls._krisp_instance is None:
|
||||
cls._krisp_instance = KrispAudioProcessor(
|
||||
sample_rate, sample_type, channels, model_path
|
||||
)
|
||||
return cls._krisp_instance
|
||||
|
||||
|
||||
class KrispFilter(BaseAudioFilter):
|
||||
def __init__(
|
||||
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
|
||||
@@ -48,7 +64,7 @@ class KrispFilter(BaseAudioFilter):
|
||||
|
||||
async def start(self, sample_rate: int):
|
||||
self._sample_rate = sample_rate
|
||||
self._krisp_processor = KrispAudioProcessor(
|
||||
self._krisp_processor = KrispProcessorManager.get_processor(
|
||||
self._sample_rate, self._sample_type, self._channels, self._model_path
|
||||
)
|
||||
|
||||
|
||||
1
src/pipecat/audio/resamplers/__init__.py
Normal file
1
src/pipecat/audio/resamplers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
30
src/pipecat/audio/resamplers/base_audio_resampler.py
Normal file
30
src/pipecat/audio/resamplers/base_audio_resampler.py
Normal file
@@ -0,0 +1,30 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseAudioResampler(ABC):
|
||||
"""Abstract base class for audio resampling. This class defines an
|
||||
interface for audio resampling implementations.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
"""
|
||||
Resamples the given audio data to a different sample rate.
|
||||
|
||||
This is an abstract method that must be implemented in subclasses.
|
||||
|
||||
Parameters:
|
||||
audio (bytes): The audio data to be resampled, represented as a byte string.
|
||||
in_rate (int): The original sample rate of the audio data (in Hz).
|
||||
out_rate (int): The desired sample rate for the resampled audio data (in Hz).
|
||||
|
||||
Returns:
|
||||
bytes: The resampled audio data as a byte string.
|
||||
"""
|
||||
pass
|
||||
25
src/pipecat/audio/resamplers/resampy_resampler.py
Normal file
25
src/pipecat/audio/resamplers/resampy_resampler.py
Normal file
@@ -0,0 +1,25 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
import resampy
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
|
||||
|
||||
class ResampyResampler(BaseAudioResampler):
|
||||
"""Audio resampler implementation using the resampy library."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
if in_rate == out_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
resampled_audio = resampy.resample(audio_data, in_rate, out_rate, filter="kaiser_fast")
|
||||
result = resampled_audio.astype(np.int16).tobytes()
|
||||
return result
|
||||
25
src/pipecat/audio/resamplers/soxr_resampler.py
Normal file
25
src/pipecat/audio/resamplers/soxr_resampler.py
Normal file
@@ -0,0 +1,25 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
import soxr
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
|
||||
|
||||
class SOXRAudioResampler(BaseAudioResampler):
|
||||
"""Audio resampler implementation using the SoX resampler library."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
if in_rate == out_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
resampled_audio = soxr.resample(audio_data, in_rate, out_rate, quality="VHQ")
|
||||
result = resampled_audio.astype(np.int16).tobytes()
|
||||
return result
|
||||
@@ -10,8 +10,24 @@ import numpy as np
|
||||
import pyloudnorm as pyln
|
||||
import soxr
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
|
||||
|
||||
|
||||
def create_default_resampler(**kwargs) -> BaseAudioResampler:
|
||||
return SOXRAudioResampler(**kwargs)
|
||||
|
||||
|
||||
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"'resample_audio()' is deprecated, use 'create_default_resampler()' instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
if original_rate == target_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
@@ -75,41 +91,45 @@ def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
|
||||
return prev_value + factor * (value - prev_value)
|
||||
|
||||
|
||||
def ulaw_to_pcm(ulaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def ulaw_to_pcm(
|
||||
ulaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
|
||||
):
|
||||
# Convert μ-law to PCM
|
||||
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
|
||||
|
||||
# Resample
|
||||
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
|
||||
|
||||
return out_pcm_bytes
|
||||
|
||||
|
||||
def pcm_to_ulaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def pcm_to_ulaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
|
||||
# Resample
|
||||
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
|
||||
|
||||
# Convert PCM to μ-law
|
||||
ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
|
||||
out_ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
|
||||
|
||||
return ulaw_bytes
|
||||
return out_ulaw_bytes
|
||||
|
||||
|
||||
def alaw_to_pcm(alaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int) -> bytes:
|
||||
async def alaw_to_pcm(
|
||||
alaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
|
||||
) -> bytes:
|
||||
# Convert a-law to PCM
|
||||
in_pcm_bytes = audioop.alaw2lin(alaw_bytes, 2)
|
||||
|
||||
# Resample
|
||||
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
|
||||
|
||||
return out_pcm_bytes
|
||||
|
||||
|
||||
def pcm_to_alaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def pcm_to_alaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
|
||||
# Resample
|
||||
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
|
||||
|
||||
# Convert PCM to μ-law
|
||||
alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
|
||||
out_alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
|
||||
|
||||
return alaw_bytes
|
||||
return out_alaw_bytes
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
@@ -104,11 +105,8 @@ class SileroOnnxModel:
|
||||
|
||||
|
||||
class SileroVADAnalyzer(VADAnalyzer):
|
||||
def __init__(self, *, sample_rate: int = 16000, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
|
||||
|
||||
if sample_rate != 16000 and sample_rate != 8000:
|
||||
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, params=params)
|
||||
|
||||
logger.debug("Loading Silero VAD model...")
|
||||
|
||||
@@ -138,6 +136,12 @@ class SileroVADAnalyzer(VADAnalyzer):
|
||||
# VADAnalyzer
|
||||
#
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
if sample_rate != 16000 and sample_rate != 8000:
|
||||
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
|
||||
super().set_sample_rate(sample_rate)
|
||||
|
||||
def num_frames_required(self) -> int:
|
||||
return 512 if self.sample_rate == 16000 else 256
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -33,11 +34,11 @@ class VADParams(BaseModel):
|
||||
|
||||
|
||||
class VADAnalyzer:
|
||||
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
|
||||
self._sample_rate = sample_rate
|
||||
self._num_channels = num_channels
|
||||
|
||||
self.set_params(params)
|
||||
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams):
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._params = params
|
||||
self._num_channels = 1
|
||||
|
||||
self._vad_buffer = b""
|
||||
|
||||
@@ -65,13 +66,17 @@ class VADAnalyzer:
|
||||
def voice_confidence(self, buffer) -> float:
|
||||
pass
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
self._sample_rate = self._init_sample_rate or sample_rate
|
||||
self.set_params(self._params)
|
||||
|
||||
def set_params(self, params: VADParams):
|
||||
logger.info(f"Setting VAD params to: {params}")
|
||||
self._params = params
|
||||
self._vad_frames = self.num_frames_required()
|
||||
self._vad_frames_num_bytes = self._vad_frames * self._num_channels * 2
|
||||
|
||||
vad_frames_per_sec = self._vad_frames / self._sample_rate
|
||||
vad_frames_per_sec = self._vad_frames / self.sample_rate
|
||||
|
||||
self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec)
|
||||
self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec)
|
||||
@@ -80,7 +85,7 @@ class VADAnalyzer:
|
||||
self._vad_state: VADState = VADState.QUIET
|
||||
|
||||
def _get_smoothed_volume(self, audio: bytes) -> float:
|
||||
volume = calculate_audio_volume(audio, self._sample_rate)
|
||||
volume = calculate_audio_volume(audio, self.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
def analyze_audio(self, buffer) -> VADState:
|
||||
|
||||
@@ -6,7 +6,18 @@
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Literal, Mapping, Optional, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
@@ -48,13 +59,13 @@ class Frame:
|
||||
id: int = field(init=False)
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
metadata: dict = field(init=False)
|
||||
metadata: Dict[str, Any] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
self.metadata: dict = {}
|
||||
self.metadata: Dict[str, Any] = {}
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -428,11 +439,13 @@ class StartFrame(SystemFrame):
|
||||
|
||||
clock: BaseClock
|
||||
task_manager: TaskManager
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
report_only_initial_ttfb: bool = False
|
||||
observer: Optional["BaseObserver"] = None
|
||||
report_only_initial_ttfb: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncIterable, Iterable, List
|
||||
from typing import Any, AsyncIterable, Dict, Iterable, List
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
@@ -41,13 +41,16 @@ class PipelineParams(BaseModel):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
allow_interruptions: bool = False
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
enable_heartbeats: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
report_only_initial_ttfb: bool = False
|
||||
observers: List[BaseObserver] = []
|
||||
heartbeats_period_secs: float = HEARTBEAT_SECONDS
|
||||
observers: List[BaseObserver] = []
|
||||
report_only_initial_ttfb: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
start_metadata: Dict[str, Any] = {}
|
||||
|
||||
|
||||
class PipelineTaskSource(FrameProcessor):
|
||||
@@ -136,6 +139,11 @@ class PipelineTask(BaseTask):
|
||||
"""Returns the name of this task."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def params(self) -> PipelineParams:
|
||||
"""Returns the pipeline parameters of this task."""
|
||||
return self._params
|
||||
|
||||
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
|
||||
self._task_manager.set_event_loop(loop)
|
||||
|
||||
@@ -271,11 +279,14 @@ class PipelineTask(BaseTask):
|
||||
clock=self._clock,
|
||||
task_manager=self._task_manager,
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
audio_in_sample_rate=self._params.audio_in_sample_rate,
|
||||
audio_out_sample_rate=self._params.audio_out_sample_rate,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_usage_metrics,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
observer=self._observer,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
)
|
||||
start_frame.metadata = self._params.start_metadata
|
||||
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
|
||||
|
||||
@@ -16,9 +16,10 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._start_open = start_open
|
||||
self._last_context_frame = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -31,7 +32,11 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
await self._stop()
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
self._last_context_frame = frame
|
||||
if self._start_open:
|
||||
self._start_open = False
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
self._last_context_frame = frame
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ class AsyncGeneratorProcessor(FrameProcessor):
|
||||
if isinstance(frame, (CancelFrame, EndFrame)):
|
||||
await self._data_queue.put(None)
|
||||
else:
|
||||
data = self._serializer.serialize(frame)
|
||||
data = await self._serializer.serialize(frame)
|
||||
if data:
|
||||
await self._data_queue.put(data)
|
||||
|
||||
|
||||
@@ -4,12 +4,18 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.audio.utils import create_default_resampler, interleave_stereo_audio, mix_audio
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
StartFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
@@ -29,16 +35,29 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, *, sample_rate: int = 24000, num_channels: int = 1, buffer_size: int = 0, **kwargs
|
||||
self,
|
||||
*,
|
||||
sample_rate: Optional[int] = None,
|
||||
num_channels: int = 1,
|
||||
buffer_size: int = 0,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._sample_rate = sample_rate
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._num_channels = num_channels
|
||||
self._buffer_size = buffer_size
|
||||
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
|
||||
self._last_user_frame_at = 0
|
||||
self._last_bot_frame_at = 0
|
||||
|
||||
self._recording = False
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self._register_event_handler("on_audio_data")
|
||||
|
||||
@property
|
||||
@@ -64,43 +83,83 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
else:
|
||||
return b""
|
||||
|
||||
def reset_audio_buffers(self):
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
async def start_recording(self):
|
||||
self._recording = True
|
||||
self._reset_recording()
|
||||
|
||||
async def stop_recording(self):
|
||||
await self._call_on_audio_data_handler()
|
||||
self._recording = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Include all audio from the user.
|
||||
if isinstance(frame, InputAudioRawFrame):
|
||||
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
# Update output sample rate if necessary.
|
||||
if isinstance(frame, StartFrame):
|
||||
self._update_sample_rate(frame)
|
||||
|
||||
if self._recording and isinstance(frame, InputAudioRawFrame):
|
||||
# Add silence if we need to.
|
||||
silence = self._compute_silence(self._last_user_frame_at)
|
||||
self._user_audio_buffer.extend(silence)
|
||||
# Add user audio.
|
||||
resampled = await self._resample_audio(frame)
|
||||
self._user_audio_buffer.extend(resampled)
|
||||
# Sync the bot's buffer to the user's buffer by adding silence if needed
|
||||
if len(self._user_audio_buffer) > len(self._bot_audio_buffer):
|
||||
silence = b"\x00" * len(resampled)
|
||||
self._bot_audio_buffer.extend(silence)
|
||||
# If the bot is speaking, include all audio from the bot.
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
# Save time of frame so we can compute silence.
|
||||
self._last_user_frame_at = time.time()
|
||||
elif self._recording and isinstance(frame, OutputAudioRawFrame):
|
||||
# Add silence if we need to.
|
||||
silence = self._compute_silence(self._last_bot_frame_at)
|
||||
self._bot_audio_buffer.extend(silence)
|
||||
# Add bot audio.
|
||||
resampled = await self._resample_audio(frame)
|
||||
self._bot_audio_buffer.extend(resampled)
|
||||
# Save time of frame so we can compute silence.
|
||||
self._last_bot_frame_at = time.time()
|
||||
|
||||
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
|
||||
await self._call_on_audio_data_handler()
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._call_on_audio_data_handler()
|
||||
if isinstance(frame, (CancelFrame, EndFrame)):
|
||||
await self.stop_recording()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def _update_sample_rate(self, frame: StartFrame):
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
async def _call_on_audio_data_handler(self):
|
||||
if not self.has_audio():
|
||||
if not self.has_audio() or not self._recording:
|
||||
return
|
||||
|
||||
merged_audio = self.merge_audio_buffers()
|
||||
await self._call_event_handler(
|
||||
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
|
||||
)
|
||||
self.reset_audio_buffers()
|
||||
self._reset_audio_buffers()
|
||||
|
||||
def _buffer_has_audio(self, buffer: bytearray) -> bool:
|
||||
return buffer is not None and len(buffer) > 0
|
||||
|
||||
def _reset_recording(self):
|
||||
self._reset_audio_buffers()
|
||||
self._last_user_frame_at = time.time()
|
||||
self._last_bot_frame_at = time.time()
|
||||
|
||||
def _reset_audio_buffers(self):
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
|
||||
async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
|
||||
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
|
||||
def _compute_silence(self, from_time: float) -> bytes:
|
||||
quiet_time = time.time() - from_time
|
||||
# We should get audio frames very frequently. We pick 100ms because
|
||||
# that's big enough, but it could be even a bit slower since we usually
|
||||
# do 20ms audio frames.
|
||||
if from_time == 0 or quiet_time < 0.1:
|
||||
return b""
|
||||
num_bytes = int(quiet_time * self._sample_rate) * 2
|
||||
silence = b"\x00" * num_bytes
|
||||
return silence
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -11,6 +13,7 @@ from pipecat.audio.vad.vad_analyzer import VADParams, VADState
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
@@ -23,7 +26,7 @@ class SileroVAD(FrameProcessor):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: Optional[int] = None,
|
||||
vad_params: VADParams = VADParams(),
|
||||
audio_passthrough: bool = False,
|
||||
):
|
||||
@@ -41,6 +44,9 @@ class SileroVAD(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
self._vad_analyzer.set_sample_rate(frame.audio_in_sample_rate)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self._analyze_audio(frame)
|
||||
if self._audio_passthrough:
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -38,7 +39,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
class OutputParams(BaseModel):
|
||||
video_width: int = 1280
|
||||
video_height: int = 720
|
||||
audio_sample_rate: int = 24000
|
||||
audio_sample_rate: Optional[int] = None
|
||||
audio_channels: int = 1
|
||||
clock_sync: bool = True
|
||||
|
||||
@@ -46,6 +47,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._out_params = out_params
|
||||
self._sample_rate = 0
|
||||
|
||||
Gst.init()
|
||||
|
||||
@@ -90,6 +92,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
self._sample_rate = self._out_params.audio_sample_rate or frame.audio_out_sample_rate
|
||||
self._player.set_state(Gst.State.PLAYING)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
@@ -122,7 +125,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
audioresample = Gst.ElementFactory.make("audioresample", None)
|
||||
audiocapsfilter = Gst.ElementFactory.make("capsfilter", None)
|
||||
audiocaps = Gst.Caps.from_string(
|
||||
f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
|
||||
f"audio/x-raw,format=S16LE,rate={self._sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
|
||||
)
|
||||
audiocapsfilter.set_property("caps", audiocaps)
|
||||
appsink_audio = Gst.ElementFactory.make("appsink", None)
|
||||
@@ -188,7 +191,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
(_, info) = buffer.map(Gst.MapFlags.READ)
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=info.data,
|
||||
sample_rate=self._out_params.audio_sample_rate,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._out_params.audio_channels,
|
||||
)
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.frames.frames import Frame, StartFrame
|
||||
|
||||
|
||||
class FrameSerializerType(Enum):
|
||||
@@ -21,10 +21,13 @@ class FrameSerializer(ABC):
|
||||
def type(self) -> FrameSerializerType:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
pass
|
||||
|
||||
@@ -25,7 +25,7 @@ class LivekitFrameSerializer(FrameSerializer):
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.BINARY
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if not isinstance(frame, OutputAudioRawFrame):
|
||||
return None
|
||||
audio_frame = AudioFrame(
|
||||
@@ -36,7 +36,7 @@ class LivekitFrameSerializer(FrameSerializer):
|
||||
)
|
||||
return pickle.dumps(audio_frame)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
audio_frame: AudioFrame = pickle.loads(data)["frame"]
|
||||
return InputAudioRawFrame(
|
||||
audio=bytes(audio_frame.data),
|
||||
|
||||
@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.BINARY
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
proto_frame = frame_protos.Frame()
|
||||
if type(frame) not in self.SERIALIZABLE_TYPES:
|
||||
logger.warning(f"Frame type {type(frame)} is not serializable")
|
||||
@@ -57,26 +57,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
|
||||
return proto_frame.SerializeToString()
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Returns a Frame object from a Frame protobuf.
|
||||
|
||||
Used to convert frames
|
||||
passed over the wire as protobufs to Frame objects used in pipelines
|
||||
and frame processors.
|
||||
|
||||
>>> serializer = ProtobufFrameSerializer()
|
||||
>>> serializer.deserialize(
|
||||
... serializer.serialize(OutputAudioFrame(data=b'1234567890')))
|
||||
InputAudioFrame(data=b'1234567890')
|
||||
|
||||
>>> serializer.deserialize(
|
||||
... serializer.serialize(TextFrame(text='hello world')))
|
||||
TextFrame(text='hello world')
|
||||
|
||||
>>> serializer.deserialize(serializer.serialize(TranscriptionFrame(
|
||||
... text="Hello there!", participantId="123", timestamp="2021-01-01")))
|
||||
TranscriptionFrame(text='Hello there!', participantId='123', timestamp='2021-01-01')
|
||||
"""
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
proto = frame_protos.Frame.FromString(data)
|
||||
which = proto.WhichOneof("frame")
|
||||
if which not in self.DESERIALIZABLE_FIELDS:
|
||||
|
||||
@@ -6,16 +6,24 @@
|
||||
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import alaw_to_pcm, pcm_to_alaw, pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.audio.utils import (
|
||||
alaw_to_pcm,
|
||||
create_default_resampler,
|
||||
pcm_to_alaw,
|
||||
pcm_to_ulaw,
|
||||
ulaw_to_pcm,
|
||||
)
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
KeypadEntry,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
@@ -23,8 +31,8 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
|
||||
class TelnyxFrameSerializer(FrameSerializer):
|
||||
class InputParams(BaseModel):
|
||||
telnyx_sample_rate: int = 8000
|
||||
sample_rate: int = 16000
|
||||
telnyx_sample_rate: Optional[int] = None
|
||||
sample_rate: Optional[int] = None
|
||||
inbound_encoding: str = "PCMU"
|
||||
outbound_encoding: str = "PCMU"
|
||||
|
||||
@@ -40,21 +48,27 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
params.inbound_encoding = inbound_encoding
|
||||
self._params = params
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.TEXT
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._telnyx_sample_rate = self._params.telnyx_sample_rate or frame.audio_in_sample_rate
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
if self._params.inbound_encoding == "PCMU":
|
||||
serialized_data = pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._params.telnyx_sample_rate
|
||||
serialized_data = await pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
|
||||
)
|
||||
elif self._params.inbound_encoding == "PCMA":
|
||||
serialized_data = pcm_to_alaw(
|
||||
data, frame.sample_rate, self._params.telnyx_sample_rate
|
||||
serialized_data = await pcm_to_alaw(
|
||||
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
|
||||
@@ -71,7 +85,7 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
answer = {"event": "clear"}
|
||||
return json.dumps(answer)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
if message["event"] == "media":
|
||||
@@ -79,18 +93,24 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
payload = base64.b64decode(payload_base64)
|
||||
|
||||
if self._params.outbound_encoding == "PCMU":
|
||||
deserialized_data = ulaw_to_pcm(
|
||||
payload, self._params.telnyx_sample_rate, self._params.sample_rate
|
||||
deserialized_data = await ulaw_to_pcm(
|
||||
payload,
|
||||
self._telnyx_sample_rate,
|
||||
self._sample_rate,
|
||||
self._resampler,
|
||||
)
|
||||
elif self._params.outbound_encoding == "PCMA":
|
||||
deserialized_data = alaw_to_pcm(
|
||||
payload, self._params.telnyx_sample_rate, self._params.sample_rate
|
||||
deserialized_data = await alaw_to_pcm(
|
||||
payload,
|
||||
self._telnyx_sample_rate,
|
||||
self._sample_rate,
|
||||
self._resampler,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")
|
||||
|
||||
audio_frame = InputAudioRawFrame(
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
|
||||
)
|
||||
return audio_frame
|
||||
elif message["event"] == "dtmf":
|
||||
|
||||
@@ -6,16 +6,18 @@
|
||||
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
KeypadEntry,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
@@ -25,25 +27,36 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
|
||||
class TwilioFrameSerializer(FrameSerializer):
|
||||
class InputParams(BaseModel):
|
||||
twilio_sample_rate: int = 8000
|
||||
sample_rate: int = 16000
|
||||
twilio_sample_rate: Optional[int] = None
|
||||
sample_rate: Optional[int] = None
|
||||
|
||||
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
|
||||
self._stream_sid = stream_sid
|
||||
self._params = params
|
||||
|
||||
self._twilio_sample_rate = 0
|
||||
self._sample_rate = 0
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.TEXT
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._twilio_sample_rate = self._params.twilio_sample_rate or frame.audio_in_sample_rate
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
answer = {"event": "clear", "streamSid": self._stream_sid}
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
|
||||
serialized_data = await pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._twilio_sample_rate, self._resampler
|
||||
)
|
||||
payload = base64.b64encode(serialized_data).decode("utf-8")
|
||||
answer = {
|
||||
"event": "media",
|
||||
@@ -55,18 +68,18 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
return json.dumps(frame.message)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
if message["event"] == "media":
|
||||
payload_base64 = message["media"]["payload"]
|
||||
payload = base64.b64decode(payload_base64)
|
||||
|
||||
deserialized_data = ulaw_to_pcm(
|
||||
payload, self._params.twilio_sample_rate, self._params.sample_rate
|
||||
deserialized_data = await ulaw_to_pcm(
|
||||
payload, self._twilio_sample_rate, self._sample_rate, self._resampler
|
||||
)
|
||||
audio_frame = InputAudioRawFrame(
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
|
||||
)
|
||||
return audio_frame
|
||||
elif message["event"] == "dtmf":
|
||||
|
||||
@@ -213,7 +213,7 @@ class TTSService(AIService):
|
||||
# if push_silence_after_stop is True, send this amount of audio silence
|
||||
silence_time_s: float = 2.0,
|
||||
# TTS output sample rate
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -224,7 +224,8 @@ class TTSService(AIService):
|
||||
self._stop_frame_timeout_s: float = stop_frame_timeout_s
|
||||
self._push_silence_after_stop: bool = push_silence_after_stop
|
||||
self._silence_time_s: float = silence_time_s
|
||||
self._sample_rate: int = sample_rate
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._voice_id: str = ""
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._text_filter: Optional[BaseTextFilter] = text_filter
|
||||
@@ -248,16 +249,20 @@ class TTSService(AIService):
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
return Language(language)
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
pass
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
return Language(language)
|
||||
|
||||
async def update_setting(self, key: str, value: Any):
|
||||
pass
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.create_task(self._stop_frame_handler())
|
||||
|
||||
@@ -467,9 +472,17 @@ class WordTTSService(TTSService):
|
||||
class STTService(AIService):
|
||||
"""STTService is a base class for speech-to-text services."""
|
||||
|
||||
def __init__(self, audio_passthrough=False, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
audio_passthrough=False,
|
||||
# STT input sample rate
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._audio_passthrough = audio_passthrough
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._muted: bool = False
|
||||
|
||||
@@ -478,6 +491,10 @@ class STTService(AIService):
|
||||
"""Returns whether the STT service is currently muted."""
|
||||
return self._muted
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@abstractmethod
|
||||
async def set_model(self, model: str):
|
||||
self.set_model_name(model)
|
||||
@@ -491,6 +508,10 @@ class STTService(AIService):
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
logger.info(f"Updating STT settings: {self._settings}")
|
||||
for key, value in settings.items():
|
||||
@@ -540,17 +561,15 @@ class SegmentedSTTService(STTService):
|
||||
min_volume: float = 0.6,
|
||||
max_silence_secs: float = 0.3,
|
||||
max_buffer_secs: float = 1.5,
|
||||
sample_rate: int = 24000,
|
||||
num_channels: int = 1,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
self._min_volume = min_volume
|
||||
self._max_silence_secs = max_silence_secs
|
||||
self._max_buffer_secs = max_buffer_secs
|
||||
self._sample_rate = sample_rate
|
||||
self._num_channels = num_channels
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
self._content = None
|
||||
self._wave = None
|
||||
self._silence_num_frames = 0
|
||||
# Volume exponential smoothing
|
||||
self._smoothing_factor = 0.2
|
||||
@@ -569,8 +588,8 @@ class SegmentedSTTService(STTService):
|
||||
|
||||
# If buffer is not empty and we have enough data or there's been a long
|
||||
# silence, transcribe the audio gathered so far.
|
||||
silence_secs = self._silence_num_frames / self._sample_rate
|
||||
buffer_secs = self._wave.getnframes() / self._sample_rate
|
||||
silence_secs = self._silence_num_frames / self.sample_rate
|
||||
buffer_secs = self._wave.getnframes() / self.sample_rate
|
||||
if self._content.tell() > 0 and (
|
||||
buffer_secs > self._max_buffer_secs or silence_secs > self._max_silence_secs
|
||||
):
|
||||
@@ -580,18 +599,24 @@ class SegmentedSTTService(STTService):
|
||||
await self.process_generator(self.run_stt(self._content.read()))
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
self._wave.close()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
self._wave.close()
|
||||
|
||||
def _new_wave(self):
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(self._num_channels)
|
||||
ww.setframerate(self._sample_rate)
|
||||
ww.setnchannels(1)
|
||||
ww.setframerate(self.sample_rate)
|
||||
return (content, ww)
|
||||
|
||||
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -38,20 +38,17 @@ class AssemblyAISTTService(STTService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
|
||||
language=Language.EN, # Only English is supported for Realtime
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
aai.settings.api_key = api_key
|
||||
self._transcriber: aai.RealtimeTranscriber | None = None
|
||||
# Store reference to the main event loop for use in callback functions
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"encoding": encoding,
|
||||
"language": language,
|
||||
}
|
||||
@@ -121,7 +118,7 @@ class AssemblyAISTTService(STTService):
|
||||
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
# This is necessary because this callback runs in a different thread
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
|
||||
|
||||
def on_error(error: aai.RealtimeError):
|
||||
"""Callback for handling errors from AssemblyAI.
|
||||
@@ -131,14 +128,16 @@ class AssemblyAISTTService(STTService):
|
||||
"""
|
||||
logger.error(f"{self}: An error occurred: {error}")
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.push_frame(ErrorFrame(str(error))), self.get_event_loop()
|
||||
)
|
||||
|
||||
def on_close():
|
||||
"""Callback for when the connection to AssemblyAI is closed."""
|
||||
logger.info(f"{self}: Disconnected from AssemblyAI")
|
||||
|
||||
self._transcriber = aai.RealtimeTranscriber(
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
encoding=self._settings["encoding"],
|
||||
on_data=on_data,
|
||||
on_error=on_error,
|
||||
|
||||
@@ -10,7 +10,7 @@ from typing import AsyncGenerator, Optional
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -124,7 +124,7 @@ class PollyTTSService(TTSService):
|
||||
aws_session_token: Optional[str] = None,
|
||||
region: Optional[str] = None,
|
||||
voice_id: str = "Joanna",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -138,7 +138,6 @@ class PollyTTSService(TTSService):
|
||||
region_name=region,
|
||||
)
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"engine": params.engine,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -148,6 +147,8 @@ class PollyTTSService(TTSService):
|
||||
"volume": params.volume,
|
||||
}
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self.set_voice(voice_id)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
@@ -193,8 +194,7 @@ class PollyTTSService(TTSService):
|
||||
response = self._polly_client.synthesize_speech(**args)
|
||||
if "AudioStream" in response:
|
||||
audio_data = response["AudioStream"].read()
|
||||
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
|
||||
return resampled
|
||||
return audio_data
|
||||
return None
|
||||
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
@@ -225,6 +225,8 @@ class PollyTTSService(TTSService):
|
||||
yield None
|
||||
return
|
||||
|
||||
audio_data = await self._resampler.resample(audio_data, 16000, self.sample_rate)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
@@ -234,7 +236,7 @@ class PollyTTSService(TTSService):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -450,14 +450,13 @@ class AzureBaseTTSService(TTSService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
voice="en-US-SaraNeural",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"emphasis": params.emphasis,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -530,25 +529,32 @@ class AzureBaseTTSService(TTSService):
|
||||
class AzureTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._speech_config = None
|
||||
self._speech_synthesizer = None
|
||||
self._audio_queue = asyncio.Queue()
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
# Now self.sample_rate is properly initialized
|
||||
self._speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
self._speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self.sample_rate)
|
||||
)
|
||||
speech_config.set_service_property(
|
||||
self._speech_config.set_service_property(
|
||||
"synthesizer.synthesis.connection.synthesisConnectionImpl",
|
||||
"websocket",
|
||||
ServicePropertyChannel.UriQueryParameter,
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
self._speech_synthesizer = SpeechSynthesizer(
|
||||
speech_config=self._speech_config, audio_config=None
|
||||
)
|
||||
|
||||
# Set up event handlers
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
|
||||
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
|
||||
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
|
||||
@@ -591,7 +597,7 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
|
||||
yield TTSAudioRawFrame(
|
||||
audio=chunk,
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
@@ -605,17 +611,22 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
class AzureHttpTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._speech_config = None
|
||||
self._speech_synthesizer = None
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
self._speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self.sample_rate)
|
||||
)
|
||||
self._speech_synthesizer = SpeechSynthesizer(
|
||||
speech_config=self._speech_config, audio_config=None
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
@@ -633,7 +644,7 @@ class AzureHttpTTSService(AzureBaseTTSService):
|
||||
# Azure always sends a 44-byte header. Strip it off.
|
||||
yield TTSAudioRawFrame(
|
||||
audio=result.audio_data[44:],
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
yield TTSStoppedFrame()
|
||||
@@ -650,24 +661,14 @@ class AzureSTTService(STTService):
|
||||
*,
|
||||
api_key: str,
|
||||
region: str,
|
||||
language=Language.EN_US,
|
||||
sample_rate=24000,
|
||||
channels=1,
|
||||
language: Language = Language.EN_US,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
speech_config.speech_recognition_language = language
|
||||
|
||||
stream_format = AudioStreamFormat(samples_per_second=sample_rate, channels=channels)
|
||||
self._audio_stream = PushAudioInputStream(stream_format)
|
||||
|
||||
audio_config = AudioConfig(stream=self._audio_stream)
|
||||
self._speech_recognizer = SpeechRecognizer(
|
||||
speech_config=speech_config, audio_config=audio_config
|
||||
)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
self._speech_config.speech_recognition_language = language
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
await self.start_processing_metrics()
|
||||
@@ -677,6 +678,16 @@ class AzureSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
|
||||
self._audio_stream = PushAudioInputStream(stream_format)
|
||||
|
||||
audio_config = AudioConfig(stream=self._audio_stream)
|
||||
|
||||
self._speech_recognizer = SpeechRecognizer(
|
||||
speech_config=self._speech_config, audio_config=audio_config
|
||||
)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_recognizer.start_continuous_recognition_async()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
|
||||
@@ -117,7 +117,6 @@ class CanonicalMetricsService(AIService):
|
||||
try:
|
||||
await self._multipart_upload(filename)
|
||||
await aiofiles.os.remove(filename)
|
||||
audio_buffer_processor.reset_audio_buffers()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except Exception as e:
|
||||
|
||||
@@ -89,7 +89,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model: str = "sonic",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
@@ -121,7 +121,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
"output_format": {
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -174,6 +174,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["output_format"]["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -262,7 +263,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
self.start_word_timestamps()
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
sample_rate=self._settings["output_format"]["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
@@ -328,7 +329,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
voice_id: str,
|
||||
model: str = "sonic",
|
||||
base_url: str = "https://api.cartesia.ai",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
@@ -341,7 +342,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
"output_format": {
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -360,6 +361,10 @@ class CartesiaHttpTTSService(TTSService):
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
return language_to_cartesia_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["output_format"]["sample_rate"] = self.sample_rate
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._client.close()
|
||||
@@ -394,9 +399,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
)
|
||||
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=output["audio"],
|
||||
sample_rate=self._settings["output_format"]["sample_rate"],
|
||||
num_channels=1,
|
||||
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
|
||||
)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -53,14 +53,13 @@ class DeepgramTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice: str = "aura-helios-en",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "linear16",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"encoding": encoding,
|
||||
}
|
||||
self.set_voice(voice)
|
||||
@@ -75,7 +74,7 @@ class DeepgramTTSService(TTSService):
|
||||
options = SpeakOptions(
|
||||
model=self._voice_id,
|
||||
encoding=self._settings["encoding"],
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
container="none",
|
||||
)
|
||||
|
||||
@@ -103,9 +102,7 @@ class DeepgramTTSService(TTSService):
|
||||
chunk = audio_buffer.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=chunk, sample_rate=self._settings["sample_rate"], num_channels=1
|
||||
)
|
||||
frame = TTSAudioRawFrame(audio=chunk, sample_rate=self.sample_rate, num_channels=1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
@@ -121,15 +118,16 @@ class DeepgramSTTService(STTService):
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "",
|
||||
live_options: LiveOptions = None,
|
||||
sample_rate: Optional[int] = None,
|
||||
live_options: Optional[LiveOptions] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
default_options = LiveOptions(
|
||||
encoding="linear16",
|
||||
language=Language.EN,
|
||||
model="nova-2-general",
|
||||
sample_rate=16000,
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
smart_format=True,
|
||||
@@ -187,6 +185,7 @@ class DeepgramSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
|
||||
@@ -104,17 +104,20 @@ def language_to_elevenlabs_language(language: Language) -> str | None:
|
||||
return result
|
||||
|
||||
|
||||
def sample_rate_from_output_format(output_format: str) -> int:
|
||||
match output_format:
|
||||
case "pcm_16000":
|
||||
return 16000
|
||||
case "pcm_22050":
|
||||
return 22050
|
||||
case "pcm_24000":
|
||||
return 24000
|
||||
case "pcm_44100":
|
||||
return 44100
|
||||
return 16000
|
||||
def output_format_from_sample_rate(sample_rate: int) -> str:
|
||||
match sample_rate:
|
||||
case 16000:
|
||||
return "pcm_16000"
|
||||
case 22050:
|
||||
return "pcm_22050"
|
||||
case 24000:
|
||||
return "pcm_24000"
|
||||
case 44100:
|
||||
return "pcm_44100"
|
||||
logger.warning(
|
||||
f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate"
|
||||
)
|
||||
return "pcm_16000"
|
||||
|
||||
|
||||
def calculate_word_times(
|
||||
@@ -165,7 +168,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
voice_id: str,
|
||||
model: str = "eleven_flash_v2_5",
|
||||
url: str = "wss://api.elevenlabs.io",
|
||||
output_format: ElevenLabsOutputFormat = "pcm_24000",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -189,7 +192,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
sample_rate=sample_rate_from_output_format(output_format),
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
@@ -197,11 +200,9 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate_from_output_format(output_format),
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else None,
|
||||
"output_format": output_format,
|
||||
"optimize_streaming_latency": params.optimize_streaming_latency,
|
||||
"stability": params.stability,
|
||||
"similarity_boost": params.similarity_boost,
|
||||
@@ -211,6 +212,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
@@ -254,7 +256,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def _update_settings(self, settings: Dict[str, Any]):
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
prev_voice = self._voice_id
|
||||
await super()._update_settings(settings)
|
||||
if not prev_voice == self._voice_id:
|
||||
@@ -264,6 +266,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._output_format = output_format_from_sample_rate(self.sample_rate)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -322,7 +325,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
voice_id = self._voice_id
|
||||
model = self.model_name
|
||||
output_format = self._settings["output_format"]
|
||||
output_format = self._output_format
|
||||
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}&auto_mode={self._settings['auto_mode']}"
|
||||
|
||||
if self._settings["optimize_streaming_latency"]:
|
||||
@@ -375,7 +378,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
frame = TTSAudioRawFrame(audio, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
if msg.get("alignment"):
|
||||
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
|
||||
@@ -428,7 +431,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
aiohttp_session: aiohttp ClientSession
|
||||
model: Model ID (default: "eleven_flash_v2_5" for low latency)
|
||||
base_url: API base URL
|
||||
output_format: Audio output format (PCM)
|
||||
sample_rate: Output sample rate
|
||||
params: Additional parameters for voice configuration
|
||||
"""
|
||||
|
||||
@@ -448,24 +451,21 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
model: str = "eleven_flash_v2_5",
|
||||
base_url: str = "https://api.elevenlabs.io",
|
||||
output_format: ElevenLabsOutputFormat = "pcm_24000",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._base_url = base_url
|
||||
self._output_format = output_format
|
||||
self._params = params
|
||||
self._session = aiohttp_session
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate_from_output_format(output_format),
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else None,
|
||||
"output_format": output_format,
|
||||
"optimize_streaming_latency": params.optimize_streaming_latency,
|
||||
"stability": params.stability,
|
||||
"similarity_boost": params.similarity_boost,
|
||||
@@ -474,6 +474,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
@@ -508,6 +509,10 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
|
||||
return voice_settings or None
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._output_format = output_format_from_sample_rate(self.sample_rate)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
"""Generate speech from text using ElevenLabs streaming API.
|
||||
|
||||
@@ -570,7 +575,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
async for chunk in response.content:
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
api_key: str,
|
||||
model: str, # This is the reference_id
|
||||
output_format: FishAudioOutputFormat = "pcm",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -70,7 +70,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
self._started = False
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
"latency": params.latency,
|
||||
"format": output_format,
|
||||
"prosody": {
|
||||
@@ -92,6 +92,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -157,9 +158,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
audio_data = msg.get("audio")
|
||||
# Only process larger chunks to remove msgpack overhead
|
||||
if audio_data and len(audio_data) > 1024:
|
||||
frame = TTSAudioRawFrame(
|
||||
audio_data, self._settings["sample_rate"], 1
|
||||
)
|
||||
frame = TTSAudioRawFrame(audio_data, self.sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
await self.stop_ttfb_metrics()
|
||||
continue
|
||||
|
||||
@@ -48,7 +48,7 @@ class AudioInputMessage(BaseModel):
|
||||
realtimeInput: RealtimeInput
|
||||
|
||||
@classmethod
|
||||
def from_raw_audio(cls, raw_audio: bytes, sample_rate=16000) -> "AudioInputMessage":
|
||||
def from_raw_audio(cls, raw_audio: bytes, sample_rate: int) -> "AudioInputMessage":
|
||||
data = base64.b64encode(raw_audio).decode("utf-8")
|
||||
return cls(
|
||||
realtimeInput=RealtimeInput(
|
||||
|
||||
@@ -203,6 +203,8 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_audio_buffer = bytearray()
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
self._sample_rate = 24000
|
||||
|
||||
self._settings = {
|
||||
"frequency_penalty": params.frequency_penalty,
|
||||
"max_tokens": params.max_tokens,
|
||||
@@ -521,7 +523,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
if self._audio_input_paused:
|
||||
return
|
||||
# Send all audio to Gemini
|
||||
evt = events.AudioInputMessage.from_raw_audio(frame.audio)
|
||||
evt = events.AudioInputMessage.from_raw_audio(frame.audio, frame.sample_rate)
|
||||
await self.send_client_event(evt)
|
||||
# Manage a buffer of audio to use for transcription
|
||||
audio = frame.audio
|
||||
@@ -650,7 +652,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
inline_data = part.inlineData
|
||||
if not inline_data:
|
||||
return
|
||||
if inline_data.mimeType != "audio/pcm;rate=24000":
|
||||
if inline_data.mimeType != f"audio/pcm;rate={self._sample_rate}":
|
||||
logger.warning(f"Unrecognized server_content format {inline_data.mimeType}")
|
||||
return
|
||||
|
||||
@@ -665,7 +667,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_audio_buffer.extend(audio)
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=audio,
|
||||
sample_rate=24000,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
@@ -131,7 +131,6 @@ def language_to_gladia_language(language: Language) -> str | None:
|
||||
|
||||
class GladiaSTTService(STTService):
|
||||
class InputParams(BaseModel):
|
||||
sample_rate: Optional[int] = 16000
|
||||
language: Optional[Language] = Language.EN
|
||||
endpointing: Optional[float] = 0.2
|
||||
maximum_duration_without_endpointing: Optional[int] = 10
|
||||
@@ -144,17 +143,18 @@ class GladiaSTTService(STTService):
|
||||
api_key: str,
|
||||
url: str = "https://api.gladia.io/v2/live",
|
||||
confidence: float = 0.5,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"encoding": "wav/pcm",
|
||||
"bit_depth": 16,
|
||||
"sample_rate": params.sample_rate,
|
||||
"sample_rate": 0,
|
||||
"channels": 1,
|
||||
"language_config": {
|
||||
"languages": [self.language_to_service_language(params.language)]
|
||||
@@ -178,6 +178,7 @@ class GladiaSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
response = await self._setup_gladia()
|
||||
self._websocket = await websockets.connect(response["url"])
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
|
||||
@@ -883,14 +883,13 @@ class GoogleTTSService(TTSService):
|
||||
credentials: Optional[str] = None,
|
||||
credentials_path: Optional[str] = None,
|
||||
voice_id: str = "en-US-Neural2-A",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"pitch": params.pitch,
|
||||
"rate": params.rate,
|
||||
"volume": params.volume,
|
||||
@@ -996,7 +995,7 @@ class GoogleTTSService(TTSService):
|
||||
)
|
||||
audio_config = texttospeech_v1.AudioConfig(
|
||||
audio_encoding=texttospeech_v1.AudioEncoding.LINEAR16,
|
||||
sample_rate_hertz=self._settings["sample_rate"],
|
||||
sample_rate_hertz=self.sample_rate,
|
||||
)
|
||||
|
||||
request = texttospeech_v1.SynthesizeSpeechRequest(
|
||||
@@ -1019,7 +1018,7 @@ class GoogleTTSService(TTSService):
|
||||
if not chunk:
|
||||
break
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
await asyncio.sleep(0) # Allow other tasks to run
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import json
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -66,7 +66,7 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
language: Language = Language.EN,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -81,7 +81,6 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"language": self.language_to_service_language(language),
|
||||
"format": "raw", # Use raw format for direct PCM data
|
||||
}
|
||||
@@ -132,7 +131,7 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
"X-API-Key": self._api_key,
|
||||
"voice": self._voice_id,
|
||||
"format": self._settings["format"],
|
||||
"sample_rate": self._settings["sample_rate"],
|
||||
"sample_rate": self.sample_rate,
|
||||
"language": self._settings["language"],
|
||||
}
|
||||
|
||||
@@ -175,7 +174,7 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=message,
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
@@ -28,6 +28,7 @@ from pipecat.frames.frames import (
|
||||
LLMTextFrame,
|
||||
LLMUpdateSettingsFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
@@ -412,20 +413,24 @@ class OpenAITTSService(TTSService):
|
||||
The service returns PCM-encoded audio at the specified sample rate.
|
||||
"""
|
||||
|
||||
OPENAI_SAMPLE_RATE = 24000 # OpenAI TTS always outputs at 24kHz
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str | None = None,
|
||||
api_key: Optional[str] = None,
|
||||
voice: str = "alloy",
|
||||
model: Literal["tts-1", "tts-1-hd"] = "tts-1",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if sample_rate and sample_rate != self.OPENAI_SAMPLE_RATE:
|
||||
logger.warning(
|
||||
f"OpenAI TTS only supports {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
|
||||
f"Current rate of {self.sample_rate}Hz may cause issues."
|
||||
)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice)
|
||||
|
||||
@@ -438,6 +443,14 @@ class OpenAITTSService(TTSService):
|
||||
logger.info(f"Switching TTS model to: [{model}]")
|
||||
self.set_model_name(model)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self.sample_rate != self.OPENAI_SAMPLE_RATE:
|
||||
logger.warning(
|
||||
f"OpenAI TTS requires {self.OPENAI_SAMPLE_RATE}Hz sample rate. "
|
||||
f"Current rate of {self.sample_rate}Hz may cause issues."
|
||||
)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
try:
|
||||
@@ -465,7 +478,7 @@ class OpenAITTSService(TTSService):
|
||||
async for chunk in r.iter_bytes(8192):
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
yield TTSStoppedFrame()
|
||||
except BadRequestError as e:
|
||||
|
||||
@@ -113,7 +113,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
user_id: str,
|
||||
voice_url: str,
|
||||
voice_engine: str = "Play3.0-mini",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
output_format: str = "wav",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -132,7 +132,6 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
self._request_id = None
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else "english",
|
||||
@@ -250,7 +249,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
if message.startswith(b"RIFF"):
|
||||
continue
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(message, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(message, self.sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
logger.debug(f"Received text message: {message}")
|
||||
@@ -301,7 +300,7 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
"voice": self._voice_id,
|
||||
"voice_engine": self._settings["voice_engine"],
|
||||
"output_format": self._settings["output_format"],
|
||||
"sample_rate": self._settings["sample_rate"],
|
||||
"sample_rate": self.sample_rate,
|
||||
"language": self._settings["language"],
|
||||
"speed": self._settings["speed"],
|
||||
"seed": self._settings["seed"],
|
||||
@@ -339,7 +338,7 @@ class PlayHTHttpTTSService(TTSService):
|
||||
user_id: str,
|
||||
voice_url: str,
|
||||
voice_engine: str = "Play3.0-mini-http", # Options: Play3.0-mini-http, Play3.0-mini-ws
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -353,7 +352,6 @@ class PlayHTHttpTTSService(TTSService):
|
||||
api_key=self._api_key,
|
||||
)
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else "english",
|
||||
@@ -365,6 +363,11 @@ class PlayHTHttpTTSService(TTSService):
|
||||
self.set_model_name(voice_engine)
|
||||
self.set_voice(voice_url)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
|
||||
def _create_options(self) -> TTSOptions:
|
||||
language_str = self._settings["language"]
|
||||
playht_language = None
|
||||
if language_str:
|
||||
@@ -374,10 +377,10 @@ class PlayHTHttpTTSService(TTSService):
|
||||
playht_language = lang
|
||||
break
|
||||
|
||||
self._options = TTSOptions(
|
||||
return TTSOptions(
|
||||
voice=self._voice_id,
|
||||
language=playht_language,
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
format=self._settings["format"],
|
||||
speed=self._settings["speed"],
|
||||
seed=self._settings["seed"],
|
||||
@@ -393,13 +396,14 @@ class PlayHTHttpTTSService(TTSService):
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
options = self._create_options()
|
||||
b = bytearray()
|
||||
in_header = True
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
playht_gen = self._client.tts(
|
||||
text, voice_engine=self._settings["voice_engine"], options=self._options
|
||||
text, voice_engine=self._settings["voice_engine"], options=options
|
||||
)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
@@ -422,7 +426,7 @@ class PlayHTHttpTTSService(TTSService):
|
||||
else:
|
||||
if len(chunk):
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
yield TTSStoppedFrame()
|
||||
except Exception as e:
|
||||
|
||||
@@ -34,7 +34,7 @@ class RimeHttpTTSService(TTSService):
|
||||
api_key: str,
|
||||
voice_id: str = "eva",
|
||||
model: str = "mist",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -43,7 +43,6 @@ class RimeHttpTTSService(TTSService):
|
||||
self._api_key = api_key
|
||||
self._base_url = "https://users.rime.ai/v1/rime-tts"
|
||||
self._settings = {
|
||||
"samplingRate": sample_rate,
|
||||
"speedAlpha": params.speed_alpha,
|
||||
"reduceLatency": params.reduce_latency,
|
||||
"pauseBetweenBrackets": params.pause_between_brackets,
|
||||
@@ -71,6 +70,7 @@ class RimeHttpTTSService(TTSService):
|
||||
payload["text"] = text
|
||||
payload["speaker"] = self._voice_id
|
||||
payload["modelId"] = self._model_name
|
||||
payload["samplingRate"] = self.sample_rate
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
@@ -96,7 +96,7 @@ class RimeHttpTTSService(TTSService):
|
||||
first_chunk = False
|
||||
|
||||
if chunk:
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["samplingRate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -49,7 +49,7 @@ class FastPitchTTSService(TTSService):
|
||||
api_key: str,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
voice_id: str = "English-US.Female-1",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
function_id: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -57,7 +57,6 @@ class FastPitchTTSService(TTSService):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
self._sample_rate = sample_rate
|
||||
self._language_code = params.language
|
||||
self._quality = params.quality
|
||||
|
||||
@@ -87,7 +86,7 @@ class FastPitchTTSService(TTSService):
|
||||
text,
|
||||
self._voice_id,
|
||||
self._language_code,
|
||||
sample_rate_hz=self._sample_rate,
|
||||
sample_rate_hz=self.sample_rate,
|
||||
audio_prompt_file=None,
|
||||
quality=self._quality,
|
||||
custom_dictionary={},
|
||||
@@ -114,7 +113,7 @@ class FastPitchTTSService(TTSService):
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=resp.audio,
|
||||
sample_rate=self._sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
yield frame
|
||||
@@ -136,10 +135,11 @@ class ParakeetSTTService(STTService):
|
||||
api_key: str,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
function_id: str = "1598d209-5e27-4d3c-8079-4751568b1081",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
self._api_key = api_key
|
||||
self._profanity_filter = False
|
||||
self._automatic_punctuation = False
|
||||
@@ -154,7 +154,6 @@ class ParakeetSTTService(STTService):
|
||||
self._stop_history_eou = -1
|
||||
self._stop_threshold_eou = -1.0
|
||||
self._custom_configuration = ""
|
||||
self._sample_rate: int = 16000
|
||||
|
||||
self.set_model_name("parakeet-ctc-1.1b-asr")
|
||||
|
||||
@@ -166,6 +165,14 @@ class ParakeetSTTService(STTService):
|
||||
|
||||
self._asr_service = riva.client.ASRService(auth)
|
||||
|
||||
self._queue = asyncio.Queue()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return False
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
config = riva.client.StreamingRecognitionConfig(
|
||||
config=riva.client.RecognitionConfig(
|
||||
encoding=riva.client.AudioEncoding.LINEAR_PCM,
|
||||
@@ -175,14 +182,16 @@ class ParakeetSTTService(STTService):
|
||||
profanity_filter=self._profanity_filter,
|
||||
enable_automatic_punctuation=self._automatic_punctuation,
|
||||
verbatim_transcripts=not self._no_verbatim_transcripts,
|
||||
sample_rate_hertz=self._sample_rate,
|
||||
sample_rate_hertz=self.sample_rate,
|
||||
audio_channel_count=1,
|
||||
),
|
||||
interim_results=True,
|
||||
)
|
||||
|
||||
riva.client.add_word_boosting_to_config(
|
||||
config, self._boosted_lm_words, self._boosted_lm_score
|
||||
)
|
||||
|
||||
riva.client.add_endpoint_parameters_to_config(
|
||||
config,
|
||||
self._start_history,
|
||||
@@ -193,15 +202,9 @@ class ParakeetSTTService(STTService):
|
||||
self._stop_threshold_eou,
|
||||
)
|
||||
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
|
||||
|
||||
self._config = config
|
||||
|
||||
self._queue = asyncio.Queue()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return False
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._thread_task = self.create_task(self._thread_task_handler())
|
||||
self._response_task = self.create_task(self._response_task_handler())
|
||||
self._response_queue = asyncio.Queue()
|
||||
|
||||
@@ -12,7 +12,7 @@ import base64
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
@@ -47,6 +47,8 @@ class TavusVideoService(AIService):
|
||||
|
||||
self._conversation_id: str
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
async def initialize(self) -> str:
|
||||
url = "https://tavusapi.com/v2/conversations"
|
||||
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
|
||||
@@ -89,12 +91,10 @@ class TavusVideoService(AIService):
|
||||
async with self._session.post(url, headers=headers) as r:
|
||||
r.raise_for_status()
|
||||
|
||||
async def _encode_audio_and_send(
|
||||
self, audio: bytes, original_sample_rate: int, done: bool
|
||||
) -> None:
|
||||
async def _encode_audio_and_send(self, audio: bytes, in_rate: int, done: bool) -> None:
|
||||
"""Encodes audio to base64 and sends it to Tavus"""
|
||||
if not done:
|
||||
audio = resample_audio(audio, original_sample_rate, 16000)
|
||||
audio = await self._resampler.resample(audio, in_rate, 16000)
|
||||
audio_base64 = base64.b64encode(audio).decode("utf-8")
|
||||
logger.trace(f"{self}: sending {len(audio)} bytes")
|
||||
await self._send_audio_message(audio_base64, done=done)
|
||||
|
||||
@@ -4,12 +4,12 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Any, AsyncGenerator, Dict
|
||||
from typing import Any, AsyncGenerator, Dict, Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -76,7 +76,7 @@ class XTTSService(TTSService):
|
||||
base_url: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
language: Language = Language.EN,
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
@@ -89,6 +89,8 @@ class XTTSService(TTSService):
|
||||
self._studio_speakers: Dict[str, Any] | None = None
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
@@ -161,17 +163,19 @@ class XTTSService(TTSService):
|
||||
buffer = buffer[48000:]
|
||||
|
||||
# XTTS uses 24000 so we need to resample to our desired rate.
|
||||
resampled_audio = resample_audio(
|
||||
bytes(process_data), 24000, self._sample_rate
|
||||
resampled_audio = await self._resampler.resample(
|
||||
bytes(process_data), 24000, self.sample_rate
|
||||
)
|
||||
# Create the frame with the resampled audio
|
||||
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
|
||||
frame = TTSAudioRawFrame(resampled_audio, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
# Process any remaining data in the buffer.
|
||||
if len(buffer) > 0:
|
||||
resampled_audio = resample_audio(bytes(buffer), 24000, self._sample_rate)
|
||||
frame = TTSAudioRawFrame(resampled_audio, self._sample_rate, 1)
|
||||
resampled_audio = await self._resampler.resample(
|
||||
bytes(buffer), 24000, self.sample_rate
|
||||
)
|
||||
frame = TTSAudioRawFrame(resampled_audio, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -5,24 +5,25 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from typing import Awaitable, Callable, Sequence, Tuple
|
||||
import sys
|
||||
from typing import Any, Awaitable, Callable, Dict, Sequence, Tuple
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.clocks.system_clock import SystemClock
|
||||
from pipecat.frames.frames import (
|
||||
ControlFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
HeartbeatFrame,
|
||||
StartFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.asyncio import TaskManager
|
||||
|
||||
|
||||
@dataclass
|
||||
class EndTestFrame(ControlFrame):
|
||||
pass
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="TRACE")
|
||||
|
||||
|
||||
class HeartbeatsObserver(BaseObserver):
|
||||
@@ -48,54 +49,58 @@ class HeartbeatsObserver(BaseObserver):
|
||||
|
||||
|
||||
class QueuedFrameProcessor(FrameProcessor):
|
||||
def __init__(self, queue: asyncio.Queue, ignore_start: bool = True):
|
||||
def __init__(
|
||||
self, queue: asyncio.Queue, queue_direction: FrameDirection, ignore_start: bool = True
|
||||
):
|
||||
super().__init__()
|
||||
self._queue = queue
|
||||
self._queue_direction = queue_direction
|
||||
self._ignore_start = ignore_start
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if self._ignore_start and isinstance(frame, StartFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self._queue.put(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
if direction == self._queue_direction:
|
||||
if not isinstance(frame, StartFrame) or not self._ignore_start:
|
||||
await self._queue.put(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def run_test(
|
||||
processor: FrameProcessor,
|
||||
*,
|
||||
frames_to_send: Sequence[Frame],
|
||||
expected_down_frames: Sequence[type],
|
||||
expected_up_frames: Sequence[type] = [],
|
||||
ignore_start: bool = True,
|
||||
start_metadata: Dict[str, Any] = {},
|
||||
send_end_frame: bool = True,
|
||||
) -> Tuple[Sequence[Frame], Sequence[Frame]]:
|
||||
received_up = asyncio.Queue()
|
||||
received_down = asyncio.Queue()
|
||||
source = QueuedFrameProcessor(received_up)
|
||||
sink = QueuedFrameProcessor(received_down)
|
||||
source = QueuedFrameProcessor(received_up, FrameDirection.UPSTREAM, ignore_start)
|
||||
sink = QueuedFrameProcessor(received_down, FrameDirection.DOWNSTREAM, ignore_start)
|
||||
|
||||
source.link(processor)
|
||||
processor.link(sink)
|
||||
pipeline = Pipeline([source, processor, sink])
|
||||
|
||||
task_manager = TaskManager()
|
||||
task_manager.set_event_loop(asyncio.get_event_loop())
|
||||
await source.queue_frame(StartFrame(clock=SystemClock(), task_manager=task_manager))
|
||||
task = PipelineTask(pipeline, params=PipelineParams(start_metadata=start_metadata))
|
||||
|
||||
for frame in frames_to_send:
|
||||
await processor.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
await task.queue_frame(frame)
|
||||
|
||||
await processor.queue_frame(EndTestFrame())
|
||||
await processor.queue_frame(EndTestFrame(), FrameDirection.UPSTREAM)
|
||||
if send_end_frame:
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
#
|
||||
# Down frames
|
||||
#
|
||||
received_down_frames: Sequence[Frame] = []
|
||||
running = True
|
||||
while running:
|
||||
while not received_down.empty():
|
||||
frame = await received_down.get()
|
||||
running = not isinstance(frame, EndTestFrame)
|
||||
if running:
|
||||
if not isinstance(frame, EndFrame) or not send_end_frame:
|
||||
received_down_frames.append(frame)
|
||||
|
||||
print("received DOWN frames =", received_down_frames)
|
||||
@@ -109,12 +114,9 @@ async def run_test(
|
||||
# Up frames
|
||||
#
|
||||
received_up_frames: Sequence[Frame] = []
|
||||
running = True
|
||||
while running:
|
||||
while not received_up.empty():
|
||||
frame = await received_up.get()
|
||||
running = not isinstance(frame, EndTestFrame)
|
||||
if running:
|
||||
received_up_frames.append(frame)
|
||||
received_up_frames.append(frame)
|
||||
|
||||
print("received UP frames =", received_up_frames)
|
||||
|
||||
|
||||
@@ -35,6 +35,9 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
# Input sample rate. It will be initialized on StartFrame.
|
||||
self._sample_rate = 0
|
||||
|
||||
# We read audio from a single queue one at a time and we then run VAD in
|
||||
# a thread. Therefore, only one thread should be necessary.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
@@ -43,10 +46,23 @@ class BaseInputTransport(FrameProcessor):
|
||||
# if passthrough is enabled.
|
||||
self._audio_task = None
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._params.vad_analyzer
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
# Configure VAD analyzer.
|
||||
if self._params.vad_enabled and self._params.vad_analyzer:
|
||||
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
|
||||
# Start audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.start(self._params.audio_in_sample_rate)
|
||||
await self._params.audio_in_filter.start(self._sample_rate)
|
||||
# Create audio input queue and task if needed.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_queue = asyncio.Queue()
|
||||
@@ -67,9 +83,6 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.cancel_task(self._audio_task)
|
||||
self._audio_task = None
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._params.vad_analyzer
|
||||
|
||||
async def push_audio_frame(self, frame: InputAudioRawFrame):
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
await self._audio_in_queue.put(frame)
|
||||
@@ -104,9 +117,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
await self.stop(frame)
|
||||
elif isinstance(frame, VADParamsUpdateFrame):
|
||||
vad_analyzer = self.vad_analyzer()
|
||||
if vad_analyzer:
|
||||
vad_analyzer.set_params(frame.params)
|
||||
if self.vad_analyzer:
|
||||
self.vad_analyzer.set_params(frame.params)
|
||||
elif isinstance(frame, FilterUpdateSettingsFrame) and self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.process_frame(frame)
|
||||
# Other frames
|
||||
@@ -140,11 +152,10 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
|
||||
state = VADState.QUIET
|
||||
vad_analyzer = self.vad_analyzer()
|
||||
if vad_analyzer:
|
||||
if self.vad_analyzer:
|
||||
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, vad_analyzer.analyze_audio, audio_frame.audio
|
||||
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
|
||||
)
|
||||
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
|
||||
return state
|
||||
|
||||
@@ -57,12 +57,11 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# framerate.
|
||||
self._camera_images = None
|
||||
|
||||
# We will write 20ms audio at a time. If we receive long audio frames we
|
||||
# will chunk them. This will help with interruption handling.
|
||||
audio_bytes_10ms = (
|
||||
int(self._params.audio_out_sample_rate / 100) * self._params.audio_out_channels * 2
|
||||
)
|
||||
self._audio_chunk_size = audio_bytes_10ms * 2
|
||||
# Output sample rate. It will be initialized on StartFrame.
|
||||
self._sample_rate = 0
|
||||
|
||||
# Chunk size that will be written. It will be computed on StartFrame
|
||||
self._audio_chunk_size = 0
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
@@ -70,10 +69,21 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
# We will write 20ms audio at a time. If we receive long audio frames we
|
||||
# will chunk them. This will help with interruption handling.
|
||||
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
|
||||
self._audio_chunk_size = audio_bytes_10ms * 2
|
||||
|
||||
# Start audio mixer.
|
||||
if self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.start(self._params.audio_out_sample_rate)
|
||||
await self._params.audio_out_mixer.start(self._sample_rate)
|
||||
self._create_camera_task()
|
||||
self._create_sink_tasks()
|
||||
|
||||
@@ -298,7 +308,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._params.audio_out_mixer.mix(silence),
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
@@ -31,12 +31,12 @@ class TransportParams(BaseModel):
|
||||
camera_out_color_format: str = "RGB"
|
||||
audio_out_enabled: bool = False
|
||||
audio_out_is_live: bool = False
|
||||
audio_out_sample_rate: int = 24000
|
||||
audio_out_sample_rate: Optional[int] = None
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_out_mixer: Optional[BaseAudioMixer] = None
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_in_sample_rate: Optional[int] = None
|
||||
audio_in_channels: int = 1
|
||||
audio_in_filter: Optional[BaseAudioFilter] = None
|
||||
vad_enabled: bool = False
|
||||
|
||||
@@ -28,35 +28,40 @@ except ModuleNotFoundError as e:
|
||||
class LocalAudioInputTransport(BaseInputTransport):
|
||||
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
self._py_audio = py_audio
|
||||
self._in_stream = None
|
||||
self._sample_rate = 0
|
||||
|
||||
sample_rate = self._params.audio_in_sample_rate
|
||||
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
self._in_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_in_channels,
|
||||
rate=params.audio_in_sample_rate,
|
||||
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
num_frames = int(self._sample_rate / 100) * 2 # 20ms of audio
|
||||
|
||||
self._in_stream = self._py_audio.open(
|
||||
format=self._py_audio.get_format_from_width(2),
|
||||
channels=self._params.audio_in_channels,
|
||||
rate=self._sample_rate,
|
||||
frames_per_buffer=num_frames,
|
||||
stream_callback=self._audio_in_callback,
|
||||
input=True,
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._in_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
if self._in_stream:
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
self._in_stream = None
|
||||
|
||||
def _audio_in_callback(self, in_data, frame_count, time_info, status):
|
||||
frame = InputAudioRawFrame(
|
||||
audio=in_data,
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_in_channels,
|
||||
)
|
||||
|
||||
@@ -68,32 +73,41 @@ class LocalAudioInputTransport(BaseInputTransport):
|
||||
class LocalAudioOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
self._py_audio = py_audio
|
||||
self._out_stream = None
|
||||
self._sample_rate = 0
|
||||
|
||||
# We only write audio frames from a single task, so only one thread
|
||||
# should be necessary.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
self._out_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_out_channels,
|
||||
rate=params.audio_out_sample_rate,
|
||||
output=True,
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
self._out_stream = self._py_audio.open(
|
||||
format=self._py_audio.get_format_from_width(2),
|
||||
channels=self._params.audio_out_channels,
|
||||
rate=self._sample_rate,
|
||||
output=True,
|
||||
)
|
||||
self._out_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
if self._out_stream:
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
)
|
||||
|
||||
|
||||
class LocalAudioTransport(BaseTransport):
|
||||
|
||||
@@ -36,35 +36,39 @@ except ModuleNotFoundError as e:
|
||||
class TkInputTransport(BaseInputTransport):
|
||||
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
self._py_audio = py_audio
|
||||
self._in_stream = None
|
||||
self._sample_rate = 0
|
||||
|
||||
sample_rate = self._params.audio_in_sample_rate
|
||||
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
self._in_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_in_channels,
|
||||
rate=params.audio_in_sample_rate,
|
||||
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
num_frames = int(self._sample_rate / 100) * 2 # 20ms of audio
|
||||
|
||||
self._in_stream = self._py_audio.open(
|
||||
format=self._py_audio.get_format_from_width(2),
|
||||
channels=self._params.audio_in_channels,
|
||||
rate=self._sample_rate,
|
||||
frames_per_buffer=num_frames,
|
||||
stream_callback=self._audio_in_callback,
|
||||
input=True,
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._in_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
if self._in_stream:
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
|
||||
def _audio_in_callback(self, in_data, frame_count, time_info, status):
|
||||
frame = InputAudioRawFrame(
|
||||
audio=in_data,
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_in_channels,
|
||||
)
|
||||
|
||||
@@ -76,18 +80,14 @@ class TkInputTransport(BaseInputTransport):
|
||||
class TkOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
self._py_audio = py_audio
|
||||
self._out_stream = None
|
||||
self._sample_rate = 0
|
||||
|
||||
# We only write audio frames from a single task, so only one thread
|
||||
# should be necessary.
|
||||
self._executor = ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
self._out_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_out_channels,
|
||||
rate=params.audio_out_sample_rate,
|
||||
output=True,
|
||||
)
|
||||
|
||||
# Start with a neutral gray background.
|
||||
array = np.ones((1024, 1024, 3)) * 128
|
||||
data = f"P5 {1024} {1024} 255 ".encode() + array.astype(np.uint8).tobytes()
|
||||
@@ -97,18 +97,31 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
self._out_stream = self._py_audio.open(
|
||||
format=self._py_audio.get_format_from_width(2),
|
||||
channels=self._params.audio_out_channels,
|
||||
rate=self._sample_rate,
|
||||
output=True,
|
||||
)
|
||||
self._out_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
if self._out_stream:
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
)
|
||||
|
||||
async def write_frame_to_camera(self, frame: OutputImageRawFrame):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
|
||||
@@ -69,6 +69,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
if self._params.session_timeout:
|
||||
self._monitor_websocket_task = self.create_task(self._monitor_websocket())
|
||||
await self._callbacks.on_client_connected(self._websocket)
|
||||
@@ -91,7 +92,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
async def _receive_messages(self):
|
||||
try:
|
||||
async for message in self._iter_data():
|
||||
frame = self._params.serializer.deserialize(message)
|
||||
frame = await self._params.serializer.deserialize(message)
|
||||
|
||||
if not frame:
|
||||
continue
|
||||
@@ -101,7 +102,7 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
else:
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
|
||||
|
||||
await self._callbacks.on_client_disconnected(self._websocket)
|
||||
|
||||
@@ -118,9 +119,19 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
self._websocket = websocket
|
||||
self._params = params
|
||||
|
||||
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
|
||||
# write_raw_audio_frames() is called quickly, as soon as we get audio
|
||||
# (e.g. from the TTS), and since this is just a network connection we
|
||||
# would be sending it to quickly. Instead, we want to block to emulate
|
||||
# an audio device, this is what the send interval is. It will be
|
||||
# computed on StartFrame.
|
||||
self._send_interval = 0
|
||||
self._next_send_time = 0
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -136,7 +147,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=frames,
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
|
||||
@@ -163,11 +174,11 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
try:
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
payload = await self._params.serializer.serialize(frame)
|
||||
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._send_data(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
|
||||
|
||||
def _send_data(self, data: str | bytes):
|
||||
if self._params.serializer.type == FrameSerializerType.BINARY:
|
||||
|
||||
@@ -101,7 +101,7 @@ class WebsocketClientSession:
|
||||
if self._websocket:
|
||||
await self._websocket.send(message)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
|
||||
|
||||
async def _client_task_handler(self):
|
||||
try:
|
||||
@@ -109,7 +109,7 @@ class WebsocketClientSession:
|
||||
async for message in self._websocket:
|
||||
await self._callbacks.on_message(self._websocket, message)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
|
||||
|
||||
await self._callbacks.on_disconnected(self._websocket)
|
||||
|
||||
@@ -126,6 +126,7 @@ class WebsocketClientInputTransport(BaseInputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
await self._session.setup(frame)
|
||||
await self._session.connect()
|
||||
|
||||
@@ -138,7 +139,7 @@ class WebsocketClientInputTransport(BaseInputTransport):
|
||||
await self._session.disconnect()
|
||||
|
||||
async def on_message(self, websocket, message):
|
||||
frame = self._params.serializer.deserialize(message)
|
||||
frame = await self._params.serializer.deserialize(message)
|
||||
if not frame:
|
||||
return
|
||||
if isinstance(frame, InputAudioRawFrame) and self._params.audio_in_enabled:
|
||||
@@ -154,11 +155,18 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
self._session = session
|
||||
self._params = params
|
||||
|
||||
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
|
||||
# write_raw_audio_frames() is called quickly, as soon as we get audio
|
||||
# (e.g. from the TTS), and since this is just a network connection we
|
||||
# would be sending it to quickly. Instead, we want to block to emulate
|
||||
# an audio device, this is what the send interval is. It will be
|
||||
# computed on StartFrame.
|
||||
self._send_interval = 0
|
||||
self._next_send_time = 0
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
await self._params.serializer.setup(frame)
|
||||
await self._session.setup(frame)
|
||||
await self._session.connect()
|
||||
|
||||
@@ -176,7 +184,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=frames,
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
|
||||
@@ -200,7 +208,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
await self._write_audio_sleep()
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
payload = await self._params.serializer.serialize(frame)
|
||||
if payload:
|
||||
await self._session.send(payload)
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.serializers.protobuf import ProtobufFrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
@@ -39,7 +38,7 @@ except ModuleNotFoundError as e:
|
||||
|
||||
class WebsocketServerParams(TransportParams):
|
||||
add_wav_header: bool = False
|
||||
serializer: FrameSerializer = ProtobufFrameSerializer()
|
||||
serializer: FrameSerializer
|
||||
session_timeout: int | None = None
|
||||
|
||||
|
||||
@@ -67,20 +66,32 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
|
||||
self._websocket: websockets.WebSocketServerProtocol | None = None
|
||||
|
||||
self._server_task = None
|
||||
|
||||
# This task will monitor the websocket connection periodically.
|
||||
self._monitor_task = None
|
||||
|
||||
self._stop_server_event = asyncio.Event()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._server_task = self.create_task(self._server_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
self._stop_server_event.set()
|
||||
await self.wait_for_task(self._server_task)
|
||||
if self._monitor_task:
|
||||
await self.cancel_task(self._monitor_task)
|
||||
if self._server_task:
|
||||
await self.wait_for_task(self._server_task)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self.cancel_task(self._server_task)
|
||||
if self._monitor_task:
|
||||
await self.cancel_task(self._monitor_task)
|
||||
if self._server_task:
|
||||
await self.cancel_task(self._server_task)
|
||||
|
||||
async def _server_task_handler(self):
|
||||
logger.info(f"Starting websocket server on {self._host}:{self._port}")
|
||||
@@ -100,12 +111,14 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
|
||||
# Create a task to monitor the websocket connection
|
||||
if self._params.session_timeout:
|
||||
self.create_task(self._monitor_websocket(websocket))
|
||||
self._monitor_task = self.create_task(
|
||||
self._monitor_websocket(websocket, self._params.session_timeout)
|
||||
)
|
||||
|
||||
# Handle incoming messages
|
||||
try:
|
||||
async for message in websocket:
|
||||
frame = self._params.serializer.deserialize(message)
|
||||
frame = await self._params.serializer.deserialize(message)
|
||||
|
||||
if not frame:
|
||||
continue
|
||||
@@ -115,7 +128,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
else:
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception receiving data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
|
||||
|
||||
# Notify disconnection
|
||||
await self._callbacks.on_client_disconnected(websocket)
|
||||
@@ -125,10 +138,13 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
|
||||
logger.info(f"Client {websocket.remote_address} disconnected")
|
||||
|
||||
async def _monitor_websocket(self, websocket: websockets.WebSocketServerProtocol):
|
||||
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
|
||||
async def _monitor_websocket(
|
||||
self, websocket: websockets.WebSocketServerProtocol, session_timeout: int
|
||||
):
|
||||
"""Wait for session_timeout seconds, if the websocket is still open,
|
||||
trigger timeout event."""
|
||||
try:
|
||||
await asyncio.sleep(self._params.session_timeout)
|
||||
await asyncio.sleep(session_timeout)
|
||||
if not websocket.closed:
|
||||
await self._callbacks.on_session_timeout(websocket)
|
||||
except asyncio.CancelledError:
|
||||
@@ -144,7 +160,12 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._websocket: websockets.WebSocketServerProtocol | None = None
|
||||
|
||||
self._send_interval = (self._audio_chunk_size / self._params.audio_out_sample_rate) / 2
|
||||
# write_raw_audio_frames() is called quickly, as soon as we get audio
|
||||
# (e.g. from the TTS), and since this is just a network connection we
|
||||
# would be sending it to quickly. Instead, we want to block to emulate
|
||||
# an audio device, this is what the send interval is. It will be
|
||||
# computed on StartFrame.
|
||||
self._send_interval = 0
|
||||
self._next_send_time = 0
|
||||
|
||||
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol | None):
|
||||
@@ -153,6 +174,11 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
logger.warning("Only one client allowed, using new connection")
|
||||
self._websocket = websocket
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -168,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=frames,
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
|
||||
@@ -193,11 +219,11 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
try:
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
payload = await self._params.serializer.serialize(frame)
|
||||
if payload and self._websocket:
|
||||
await self._websocket.send(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception sending data (class: {e.__class__.__name__})")
|
||||
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
|
||||
|
||||
async def _write_audio_sleep(self):
|
||||
# Simulate a clock.
|
||||
@@ -213,14 +239,13 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
class WebsocketServerTransport(BaseTransport):
|
||||
def __init__(
|
||||
self,
|
||||
params: WebsocketServerParams,
|
||||
host: str = "localhost",
|
||||
port: int = 8765,
|
||||
params: WebsocketServerParams = WebsocketServerParams(),
|
||||
input_name: str | None = None,
|
||||
output_name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
):
|
||||
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
|
||||
super().__init__(input_name=input_name, output_name=output_name)
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._params = params
|
||||
|
||||
@@ -71,11 +71,11 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
|
||||
|
||||
|
||||
class WebRTCVADAnalyzer(VADAnalyzer):
|
||||
def __init__(self, *, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, num_channels=num_channels, params=params)
|
||||
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, params=params)
|
||||
|
||||
self._webrtc_vad = Daily.create_native_vad(
|
||||
reset_period_ms=VAD_RESET_PERIOD_MS, sample_rate=sample_rate, channels=num_channels
|
||||
reset_period_ms=VAD_RESET_PERIOD_MS, sample_rate=self.sample_rate, channels=1
|
||||
)
|
||||
logger.debug("Loaded native WebRTC VAD")
|
||||
|
||||
@@ -222,33 +222,13 @@ class DailyTransportClient(EventHandler):
|
||||
self._callback_queue = asyncio.Queue()
|
||||
self._callback_task = None
|
||||
|
||||
# Input and ouput sample rates. They will be initialize on setup().
|
||||
self._in_sample_rate = 0
|
||||
self._out_sample_rate = 0
|
||||
|
||||
self._camera: VirtualCameraDevice | None = None
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera = Daily.create_camera_device(
|
||||
self._camera_name(),
|
||||
width=self._params.camera_out_width,
|
||||
height=self._params.camera_out_height,
|
||||
color_format=self._params.camera_out_color_format,
|
||||
)
|
||||
|
||||
self._mic: VirtualMicrophoneDevice | None = None
|
||||
if self._params.audio_out_enabled:
|
||||
self._mic = Daily.create_microphone_device(
|
||||
self._mic_name(),
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
channels=self._params.audio_out_channels,
|
||||
non_blocking=True,
|
||||
)
|
||||
|
||||
self._speaker: VirtualSpeakerDevice | None = None
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._speaker = Daily.create_speaker_device(
|
||||
self._speaker_name(),
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
channels=self._params.audio_in_channels,
|
||||
non_blocking=True,
|
||||
)
|
||||
Daily.select_speaker_device(self._speaker_name())
|
||||
|
||||
def _camera_name(self):
|
||||
return f"camera-{self}"
|
||||
@@ -281,7 +261,7 @@ class DailyTransportClient(EventHandler):
|
||||
if not self._speaker:
|
||||
return None
|
||||
|
||||
sample_rate = self._params.audio_in_sample_rate
|
||||
sample_rate = self._in_sample_rate
|
||||
num_channels = self._params.audio_in_channels
|
||||
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
|
||||
|
||||
@@ -315,6 +295,34 @@ class DailyTransportClient(EventHandler):
|
||||
self._camera.write_frame(frame.image)
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
if self._params.camera_out_enabled and not self._camera:
|
||||
self._camera = Daily.create_camera_device(
|
||||
self._camera_name(),
|
||||
width=self._params.camera_out_width,
|
||||
height=self._params.camera_out_height,
|
||||
color_format=self._params.camera_out_color_format,
|
||||
)
|
||||
|
||||
if self._params.audio_out_enabled and not self._mic:
|
||||
self._mic = Daily.create_microphone_device(
|
||||
self._mic_name(),
|
||||
sample_rate=self._out_sample_rate,
|
||||
channels=self._params.audio_out_channels,
|
||||
non_blocking=True,
|
||||
)
|
||||
|
||||
if (self._params.audio_in_enabled or self._params.vad_enabled) and not self._speaker:
|
||||
self._speaker = Daily.create_speaker_device(
|
||||
self._speaker_name(),
|
||||
sample_rate=self._in_sample_rate,
|
||||
channels=self._params.audio_in_channels,
|
||||
non_blocking=True,
|
||||
)
|
||||
Daily.select_speaker_device(self._speaker_name())
|
||||
|
||||
if not self._task_manager:
|
||||
self._task_manager = frame.task_manager
|
||||
self._callback_task = self._task_manager.create_task(
|
||||
@@ -707,6 +715,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._client = client
|
||||
self._params = params
|
||||
|
||||
self._video_renderers = {}
|
||||
|
||||
@@ -715,11 +724,10 @@ class DailyInputTransport(BaseInputTransport):
|
||||
self._audio_in_task = None
|
||||
|
||||
self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
|
||||
if params.vad_enabled and not params.vad_analyzer:
|
||||
self._vad_analyzer = WebRTCVADAnalyzer(
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
num_channels=self._params.audio_in_channels,
|
||||
)
|
||||
|
||||
@property
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._vad_analyzer
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Parent start.
|
||||
@@ -728,6 +736,9 @@ class DailyInputTransport(BaseInputTransport):
|
||||
await self._client.setup(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
# Inialize WebRTC VAD if needed.
|
||||
if self._params.vad_enabled and not self._params.vad_analyzer:
|
||||
self._vad_analyzer = WebRTCVADAnalyzer(sample_rate=self.sample_rate)
|
||||
# Create audio task. It reads audio frames from Daily and push them
|
||||
# internally for VAD processing.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
@@ -757,9 +768,6 @@ class DailyInputTransport(BaseInputTransport):
|
||||
await super().cleanup()
|
||||
await self._client.cleanup()
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._vad_analyzer
|
||||
|
||||
#
|
||||
# FrameProcessor
|
||||
#
|
||||
|
||||
@@ -11,7 +11,7 @@ from typing import Any, Awaitable, Callable, List, Optional
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
@@ -101,6 +101,7 @@ class LiveKitTransportClient:
|
||||
return self._room
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._out_sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
if not self._task_manager:
|
||||
self._task_manager = frame.task_manager
|
||||
self._room = rtc.Room(loop=self._task_manager.get_event_loop())
|
||||
@@ -138,7 +139,7 @@ class LiveKitTransportClient:
|
||||
|
||||
# Set up audio source and track
|
||||
self._audio_source = rtc.AudioSource(
|
||||
self._params.audio_out_sample_rate, self._params.audio_out_channels
|
||||
self._out_sample_rate, self._params.audio_out_channels
|
||||
)
|
||||
self._audio_track = rtc.LocalAudioTrack.create_audio_track(
|
||||
"pipecat-audio", self._audio_source
|
||||
@@ -349,6 +350,11 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
self._client = client
|
||||
self._audio_in_task = None
|
||||
self._vad_analyzer: VADAnalyzer | None = params.vad_analyzer
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
@property
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._vad_analyzer
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
@@ -371,9 +377,6 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
await self.cancel_task(self._audio_in_task)
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._vad_analyzer
|
||||
|
||||
async def push_app_message(self, message: Any, sender: str):
|
||||
frame = LiveKitTransportMessageUrgentFrame(message=message, participant_id=sender)
|
||||
await self.push_frame(frame)
|
||||
@@ -384,7 +387,9 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
audio_data = await self._client.get_next_audio_frame()
|
||||
if audio_data:
|
||||
audio_frame_event, participant_id = audio_data
|
||||
pipecat_audio_frame = self._convert_livekit_audio_to_pipecat(audio_frame_event)
|
||||
pipecat_audio_frame = await self._convert_livekit_audio_to_pipecat(
|
||||
audio_frame_event
|
||||
)
|
||||
input_audio_frame = InputAudioRawFrame(
|
||||
audio=pipecat_audio_frame.audio,
|
||||
sample_rate=pipecat_audio_frame.sample_rate,
|
||||
@@ -392,18 +397,18 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
)
|
||||
await self.push_audio_frame(input_audio_frame)
|
||||
|
||||
def _convert_livekit_audio_to_pipecat(
|
||||
async def _convert_livekit_audio_to_pipecat(
|
||||
self, audio_frame_event: rtc.AudioFrameEvent
|
||||
) -> AudioRawFrame:
|
||||
audio_frame = audio_frame_event.frame
|
||||
|
||||
audio_data = resample_audio(
|
||||
audio_frame.data.tobytes(), audio_frame.sample_rate, self._params.audio_in_sample_rate
|
||||
audio_data = await self._resampler.resample(
|
||||
audio_frame.data.tobytes(), audio_frame.sample_rate, self.sample_rate
|
||||
)
|
||||
|
||||
return AudioRawFrame(
|
||||
audio=audio_data,
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=audio_frame.num_channels,
|
||||
)
|
||||
|
||||
@@ -445,7 +450,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
|
||||
|
||||
return rtc.AudioFrame(
|
||||
data=pipecat_audio,
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
samples_per_channel=samples_per_channel,
|
||||
)
|
||||
|
||||
@@ -31,7 +31,11 @@ class TestSentenceAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
expected_returned_frames = [TextFrame, TextFrame, TextFrame]
|
||||
|
||||
(received_down, _) = await run_test(aggregator, frames_to_send, expected_returned_frames)
|
||||
(received_down, _) = await run_test(
|
||||
aggregator,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
assert received_down[-3].text == "Hello, world. "
|
||||
assert received_down[-2].text == "How are you? "
|
||||
assert received_down[-1].text == "I am fine! "
|
||||
@@ -66,5 +70,7 @@ class TestGatedAggregator(unittest.IsolatedAsyncioTestCase):
|
||||
]
|
||||
|
||||
(received_down, _) = await run_test(
|
||||
gated_aggregator, frames_to_send, expected_returned_frames
|
||||
gated_aggregator,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
@@ -19,7 +18,7 @@ from pipecat.processors.filters.frame_filter import FrameFilter
|
||||
from pipecat.processors.filters.function_filter import FunctionFilter
|
||||
from pipecat.processors.filters.identity_filter import IdentityFilter
|
||||
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
|
||||
from pipecat.tests.utils import EndTestFrame, run_test
|
||||
from pipecat.tests.utils import run_test
|
||||
|
||||
|
||||
class TestIdentifyFilter(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -27,27 +26,44 @@ class TestIdentifyFilter(unittest.IsolatedAsyncioTestCase):
|
||||
filter = IdentityFilter()
|
||||
frames_to_send = [UserStartedSpeakingFrame(), UserStoppedSpeakingFrame()]
|
||||
expected_returned_frames = [UserStartedSpeakingFrame, UserStoppedSpeakingFrame]
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
|
||||
class TestFrameFilter(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_text_frame(self):
|
||||
filter = FrameFilter(types=(TextFrame, EndTestFrame))
|
||||
filter = FrameFilter(types=(TextFrame,))
|
||||
frames_to_send = [TextFrame(text="Hello Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_end_frame(self):
|
||||
filter = FrameFilter(types=(EndFrame, EndTestFrame))
|
||||
filter = FrameFilter(types=(EndFrame,))
|
||||
frames_to_send = [EndFrame()]
|
||||
expected_returned_frames = [EndFrame]
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
send_end_frame=False,
|
||||
)
|
||||
|
||||
async def test_system_frame(self):
|
||||
filter = FrameFilter(types=(EndTestFrame,))
|
||||
filter = FrameFilter(types=())
|
||||
frames_to_send = [UserStartedSpeakingFrame()]
|
||||
expected_returned_frames = [UserStartedSpeakingFrame]
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
|
||||
class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -58,7 +74,11 @@ class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
|
||||
filter = FunctionFilter(filter=passthrough)
|
||||
frames_to_send = [TextFrame(text="Hello Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_no_passthrough(self):
|
||||
async def no_passthrough(frame: Frame):
|
||||
@@ -66,14 +86,12 @@ class TestFunctionFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
filter = FunctionFilter(filter=no_passthrough)
|
||||
frames_to_send = [TextFrame(text="Hello Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
run_test(filter, frames_to_send, expected_returned_frames), timeout=0.5
|
||||
)
|
||||
assert False
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
expected_returned_frames = []
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
|
||||
class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -81,7 +99,11 @@ class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
|
||||
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
|
||||
frames_to_send = [TranscriptionFrame(user_id="test", text="Phrase 1", timestamp="")]
|
||||
expected_returned_frames = []
|
||||
await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_wake_word(self):
|
||||
filter = WakeCheckFilter(wake_phrases=["Hey, Pipecat"])
|
||||
@@ -90,5 +112,9 @@ class TestWakeCheckFilter(unittest.IsolatedAsyncioTestCase):
|
||||
TranscriptionFrame(user_id="test", text="Phrase 1", timestamp=""),
|
||||
]
|
||||
expected_returned_frames = [TranscriptionFrame, TranscriptionFrame]
|
||||
(received_down, _) = await run_test(filter, frames_to_send, expected_returned_frames)
|
||||
(received_down, _) = await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
assert received_down[-1].text == "Phrase 1"
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
import asyncio
|
||||
import unittest
|
||||
|
||||
from pipecat.frames.frames import EndFrame, HeartbeatFrame, TextFrame
|
||||
from pipecat.frames.frames import EndFrame, HeartbeatFrame, StartFrame, TextFrame
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -22,7 +22,11 @@ class TestPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(pipeline, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_pipeline_multiple(self):
|
||||
identity1 = IdentityFilter()
|
||||
@@ -33,7 +37,25 @@ class TestPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(pipeline, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_pipeline_start_metadata(self):
|
||||
pipeline = Pipeline([IdentityFilter()])
|
||||
|
||||
frames_to_send = []
|
||||
expected_returned_frames = [StartFrame]
|
||||
(received_down, _) = await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
ignore_start=False,
|
||||
start_metadata={"foo": "bar"},
|
||||
)
|
||||
assert "foo" in received_down[-1].metadata
|
||||
|
||||
|
||||
class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -42,7 +64,11 @@ class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(pipeline, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_parallel_multiple(self):
|
||||
"""Should only passthrough one instance of TextFrame."""
|
||||
@@ -50,7 +76,11 @@ class TestParallelPipeline(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [TextFrame(text="Hello from Pipecat!")]
|
||||
expected_returned_frames = [TextFrame]
|
||||
await run_test(pipeline, frames_to_send, expected_returned_frames)
|
||||
await run_test(
|
||||
pipeline,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
|
||||
class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -79,7 +109,9 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_heartbeats=True, heartbeats_period_secs=0.2, observers=[heartbeats_observer]
|
||||
enable_heartbeats=True,
|
||||
heartbeats_period_secs=0.2,
|
||||
observers=[heartbeats_observer],
|
||||
),
|
||||
)
|
||||
task.set_event_loop(asyncio.get_event_loop())
|
||||
|
||||
@@ -20,17 +20,19 @@ class TestProtobufFrameSerializer(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def test_roundtrip(self):
|
||||
text_frame = TextFrame(text="hello world")
|
||||
frame = self.serializer.deserialize(self.serializer.serialize(text_frame))
|
||||
frame = await self.serializer.deserialize(await self.serializer.serialize(text_frame))
|
||||
self.assertEqual(text_frame, frame)
|
||||
|
||||
transcription_frame = TranscriptionFrame(
|
||||
text="Hello there!", user_id="123", timestamp="2021-01-01"
|
||||
)
|
||||
frame = self.serializer.deserialize(self.serializer.serialize(transcription_frame))
|
||||
frame = await self.serializer.deserialize(
|
||||
await self.serializer.serialize(transcription_frame)
|
||||
)
|
||||
self.assertEqual(frame, transcription_frame)
|
||||
|
||||
audio_frame = OutputAudioRawFrame(audio=b"1234567890", sample_rate=16000, num_channels=1)
|
||||
frame = self.serializer.deserialize(self.serializer.serialize(audio_frame))
|
||||
frame = await self.serializer.deserialize(await self.serializer.serialize(audio_frame))
|
||||
self.assertEqual(frame.audio, audio_frame.audio)
|
||||
self.assertEqual(frame.sample_rate, audio_frame.sample_rate)
|
||||
self.assertEqual(frame.num_channels, audio_frame.num_channels)
|
||||
|
||||
Reference in New Issue
Block a user