Compare commits
62 Commits
cb/extra-l
...
v0.0.56
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4b2160f9c | ||
|
|
dd7926aab5 | ||
|
|
070bf66980 | ||
|
|
962fc27dbd | ||
|
|
3d4d6132fc | ||
|
|
a96d9294b7 | ||
|
|
a6e78550d5 | ||
|
|
969de92ad9 | ||
|
|
c4dbe92b30 | ||
|
|
684764fece | ||
|
|
c4be07693f | ||
|
|
c5d5ca8232 | ||
|
|
428e763814 | ||
|
|
0efa2711ff | ||
|
|
4904f52cee | ||
|
|
dbcf14ddb4 | ||
|
|
7c13ec10d9 | ||
|
|
29b9dccc53 | ||
|
|
e8ce826473 | ||
|
|
bbb991dfd8 | ||
|
|
4432e7e4f7 | ||
|
|
ee9cce64b2 | ||
|
|
1ae4f0150d | ||
|
|
4c77c3ed34 | ||
|
|
975b97472a | ||
|
|
c8ccf13bc7 | ||
|
|
ba59736f87 | ||
|
|
bc21a0b817 | ||
|
|
99d3227ff5 | ||
|
|
7730f59635 | ||
|
|
ba31546c32 | ||
|
|
a363d12d1f | ||
|
|
feab9c8fa2 | ||
|
|
61f6669926 | ||
|
|
3be69908d2 | ||
|
|
fcb80ec330 | ||
|
|
c9f5684e2f | ||
|
|
c257fa1573 | ||
|
|
97c55da29f | ||
|
|
49426aa9a1 | ||
|
|
0a333c26da | ||
|
|
75a29424ff | ||
|
|
cd1b429308 | ||
|
|
7f1ae4b8cc | ||
|
|
af9fd811cd | ||
|
|
69f5c9b9d3 | ||
|
|
ab45e481be | ||
|
|
cc54255c41 | ||
|
|
1cdb66f889 | ||
|
|
51a86a509c | ||
|
|
824898f7b7 | ||
|
|
57dadb6359 | ||
|
|
5dcdc68ef5 | ||
|
|
aafb2db620 | ||
|
|
f3f22cf61c | ||
|
|
371c2f3704 | ||
|
|
1f14f62696 | ||
|
|
06449eff2c | ||
|
|
dcfb86583d | ||
|
|
cda34a1320 | ||
|
|
13611fd8e1 | ||
|
|
fc89aad469 |
88
CHANGELOG.md
88
CHANGELOG.md
@@ -5,10 +5,61 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.56] - 2025-02-06
|
||||
|
||||
### Changed
|
||||
|
||||
- Improved foundational examples 22b, 22c, and 22d to support function calling.
|
||||
With these base examples, `FunctionCallInProgressFrame` and
|
||||
`FunctionCallResultFrame` will no longer be blocked by the gates.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `TkLocalTransport` and `LocalAudioTransport` issues that was causing
|
||||
errors on cleanup.
|
||||
|
||||
- Fixed an issue that was causing `tests.utils` import to fail because of
|
||||
logging setup.
|
||||
|
||||
- Fixed a `SentryMetrics` issue that was preventing any metrics to be sent to
|
||||
Sentry and also was preventing from metrics frames to be pushed to the pipeline.
|
||||
|
||||
- Fixed an issue in `BaseOutputTransport` where incoming audio would not be
|
||||
resampled to the desired output sample rate.
|
||||
|
||||
- Fixed an issue with the `TwilioFrameSerializer` and `TelnyxFrameSerializer`
|
||||
where `twilio_sample_rate` and `telnyx_sample_rate` were incorrectly
|
||||
initialized to `audio_in_sample_rate`. Those values currently default to 8000
|
||||
and should be set manually from the serializer constructor if a different
|
||||
value is needed.
|
||||
|
||||
### Changed
|
||||
|
||||
- Use `gemini-2.0-flash-001` as the default model for `GoogleLLMSerivce`.
|
||||
|
||||
### Other
|
||||
|
||||
- Added a new `sentry-metrics` example.
|
||||
|
||||
## [0.0.55] - 2025-02-05
|
||||
|
||||
### Added
|
||||
|
||||
- Added a new `start_metadata` field to `PipelineParams`. The provided metadata
|
||||
will be set to the initial `StartFrame` being pushed from the `PipelineTask`.
|
||||
|
||||
- Added new fields to `PipelineParams` to control audio input and output sample
|
||||
rates for the whole pipeline. This allows controlling sample rates from a
|
||||
single place instead of having to specify sample rates in each
|
||||
service. Setting a sample rate to a service is still possible and will
|
||||
override the value from `PipelineParams`.
|
||||
|
||||
- Introduce audio resamplers (`BaseAudioResampler`). This is just a base class
|
||||
to implement audio resamplers. Currently, two implementations are provided
|
||||
`SOXRAudioResampler` and `ResampyResampler`. A new
|
||||
`create_default_resampler()` has been added (replacing the now deprecated
|
||||
`resample_audio()`).
|
||||
|
||||
- It is now possible to specify the asyncio event loop that a `PipelineTask` and
|
||||
all the processors should run on by passing it as a new argument to the
|
||||
`PipelineRunner`. This could allow running pipelines in multiple threads each
|
||||
@@ -41,6 +92,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `GatedOpenAILLMContextAggregator` now require keyword arguments. Also, a new
|
||||
`start_open` argument has been added to set the initial state of the gate.
|
||||
|
||||
- Added `organization` and `project` level authentication to
|
||||
`OpenAILLMService`.
|
||||
|
||||
@@ -56,8 +110,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
|
||||
`OutputDTMFFrame` frame.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `resample_audio()` is now deprecated, use `create_default_resampler()`
|
||||
instead.
|
||||
|
||||
### Removed
|
||||
|
||||
- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
|
||||
`AudioBufferProcessor.start_recording()` and
|
||||
``AudioBufferProcessor.stop_recording()` instead.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
|
||||
|
||||
- Fixed an issue in `AudioBufferProcessor` where user callback would not be
|
||||
called on task cancellation.
|
||||
|
||||
- Fixed an issue in `AudioBufferProcessor` that would cause wrong silence
|
||||
padding in some cases.
|
||||
|
||||
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
|
||||
websocket error by increasing the max message size limit to 16MB.
|
||||
|
||||
@@ -73,6 +146,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Other
|
||||
|
||||
- Improved Unit Test `run_test()` to use `PipelineTask` and
|
||||
`PipelineRunner`. There's now also some control around `StartFrame` and
|
||||
`EndFrame`. The `EndTaskFrame` has been removed since it doesn't seem
|
||||
necessary with this new approach.
|
||||
|
||||
- Updated `twilio-chatbot` with a few new features: use 8000 sample rate and
|
||||
avoid resampling, a new client useful for stress testing and testing locally
|
||||
without the need to make phone calls. Also, added audio recording on both the
|
||||
client and the server to make sure the audio sounds good.
|
||||
|
||||
- Updated examples to use `task.cancel()` to immediately exit the example when a
|
||||
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
|
||||
an `EndFrame` causes the bot to run through everything that is internally
|
||||
@@ -1527,6 +1610,9 @@ async def on_connected(processor):
|
||||
|
||||
### Changed
|
||||
|
||||
- `FrameSerializer.serialize()` and `FrameSerializer.deserialize()` are now
|
||||
`async`.
|
||||
|
||||
- `Filter` has been renamed to `FrameFilter` and it's now under
|
||||
`processors/filters`.
|
||||
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
build~=1.2.2
|
||||
grpcio-tools~=1.69.0
|
||||
grpcio-tools~=1.67.1
|
||||
pip-tools~=7.4.1
|
||||
pre-commit~=4.0.1
|
||||
pyright~=1.1.392
|
||||
pytest~=8.3.4
|
||||
pytest-asyncio~=0.25.2
|
||||
ruff~=0.9.1
|
||||
setuptools~=75.8.0
|
||||
setuptools~=70.0.0
|
||||
setuptools_scm~=8.1.0
|
||||
python-dotenv~=1.0.1
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
|
||||
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
|
||||
):
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
|
||||
@@ -17,7 +17,7 @@ from runner import configure
|
||||
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
@@ -31,16 +31,15 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
class SilenceFrame(OutputAudioRawFrame):
|
||||
def __init__(
|
||||
self,
|
||||
audio: bytes = None,
|
||||
sample_rate: int = 16000,
|
||||
num_channels: int = 1,
|
||||
duration: float = 0.1,
|
||||
*,
|
||||
sample_rate: int,
|
||||
duration: float,
|
||||
):
|
||||
# Initialize the parent class with the silent frame's data
|
||||
super().__init__(
|
||||
audio=self.create_silent_audio_frame(sample_rate, num_channels, duration).audio,
|
||||
audio=self.create_silent_audio_frame(sample_rate, 1, duration).audio,
|
||||
sample_rate=sample_rate,
|
||||
num_channels=num_channels,
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@@ -80,7 +79,10 @@ async def main():
|
||||
return
|
||||
await task.queue_frames(
|
||||
[
|
||||
SilenceFrame(duration=0.5),
|
||||
SilenceFrame(
|
||||
sample_rate=task.params.audio_out_sample_rate,
|
||||
duration=0.5,
|
||||
),
|
||||
TTSSpeakFrame(f"Hello there, how are you doing today ?"),
|
||||
EndFrame(),
|
||||
]
|
||||
|
||||
@@ -65,7 +65,6 @@ async def main():
|
||||
# English
|
||||
#
|
||||
voice_id="cgSgspJ2msm6clMCkdW9",
|
||||
aiohttp_session=session,
|
||||
#
|
||||
# Spanish
|
||||
#
|
||||
@@ -124,6 +123,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await audio_buffer_processor.start_recording()
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -82,7 +82,6 @@ async def main():
|
||||
# English
|
||||
#
|
||||
voice_id="cgSgspJ2msm6clMCkdW9",
|
||||
aiohttp_session=session,
|
||||
#
|
||||
# Spanish
|
||||
#
|
||||
@@ -109,8 +108,9 @@ async def main():
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# Save audio every 10 seconds.
|
||||
audiobuffer = AudioBufferProcessor(buffer_size=480000)
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You
|
||||
# can pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
@@ -132,6 +132,7 @@ async def main():
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await audiobuffer.start_recording()
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ async def main():
|
||||
)
|
||||
|
||||
elevenlabs_tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
@@ -37,7 +37,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -38,7 +38,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
@@ -40,7 +40,6 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -216,11 +216,7 @@ async def main():
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(
|
||||
model="gemini-1.5-flash-latest",
|
||||
# model="gemini-exp-1114",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
)
|
||||
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -48,7 +48,6 @@ async def main():
|
||||
region=os.getenv("AZURE_SPEECH_REGION"),
|
||||
)
|
||||
tts2 = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="jBpfuIE2acCO8z3wKNLl",
|
||||
)
|
||||
|
||||
@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
@@ -61,7 +61,6 @@ async def main():
|
||||
"Test",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
@@ -78,7 +77,9 @@ async def main():
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.local.tk import TkLocalTransport
|
||||
@@ -62,7 +62,7 @@ async def main():
|
||||
tk_root.title("Local Mirror")
|
||||
|
||||
daily_transport = DailyTransport(
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
|
||||
room_url, token, "Test", DailyParams(audio_in_enabled=True)
|
||||
)
|
||||
|
||||
tk_transport = TkLocalTransport(
|
||||
@@ -82,7 +82,9 @@ async def main():
|
||||
|
||||
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
|
||||
)
|
||||
|
||||
async def run_tk():
|
||||
while not task.has_finished():
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
def __init__(self, participant_id: str | None = None):
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
def __init__(self, participant_id: str | None = None):
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
|
||||
@@ -72,9 +73,7 @@ async def main():
|
||||
|
||||
vision_aggregator = VisionImageFrameAggregator()
|
||||
|
||||
google = GoogleLLMService(
|
||||
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
|
||||
)
|
||||
google = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
def __init__(self, participant_id: str | None = None):
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class UserImageRequester(FrameProcessor):
|
||||
def __init__(self, participant_id: str | None = None):
|
||||
def __init__(self, participant_id: Optional[str] = None):
|
||||
super().__init__()
|
||||
self._participant_id = participant_id
|
||||
|
||||
|
||||
@@ -62,11 +62,7 @@ async def main():
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(
|
||||
model="gemini-1.5-flash-latest",
|
||||
# model="gemini-exp-1114",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
)
|
||||
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
|
||||
llm.register_function("get_weather", get_weather)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
|
||||
@@ -51,8 +51,6 @@ async def main():
|
||||
out_params=GStreamerPipelineSource.OutputParams(
|
||||
video_width=1280,
|
||||
video_height=720,
|
||||
audio_sample_rate=24000,
|
||||
audio_channels=1,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -80,9 +80,7 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
|
||||
@@ -177,9 +177,7 @@ async def main():
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_in_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
|
||||
|
||||
@@ -237,7 +237,7 @@ async def main():
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
llm = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
|
||||
|
||||
# you can either register a single function for all function calls, or specific functions
|
||||
# llm.register_function(None, fetch_weather_from_api)
|
||||
|
||||
@@ -88,6 +88,10 @@ async def main():
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
# We just use 16000 because that's what Tavus is expecting and
|
||||
# we avoid resampling.
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
|
||||
@@ -104,8 +104,11 @@ async def main():
|
||||
)
|
||||
|
||||
# This processor keeps the last context and will let it through once the
|
||||
# notifier is woken up.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
|
||||
# notifier is woken up. We start with the gate open because we send an
|
||||
# initial context frame to start the conversation.
|
||||
gated_context_aggregator = GatedOpenAILLMContextAggregator(
|
||||
notifier=notifier, start_open=True
|
||||
)
|
||||
|
||||
# Notify if the user hasn't said anything.
|
||||
async def user_idle_notifier(frame):
|
||||
|
||||
@@ -12,6 +12,7 @@ import time
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -19,6 +20,8 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMMessagesFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
@@ -26,6 +29,7 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -129,9 +133,9 @@ class CompletenessCheck(FrameProcessor):
|
||||
|
||||
|
||||
class OutputGate(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._gate_open = False
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
|
||||
@@ -156,6 +160,11 @@ class OutputGate(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Don't block function call frames
|
||||
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Ignore frames that are not following the direction of this gate.
|
||||
if direction != FrameDirection.DOWNSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -186,6 +195,16 @@ class OutputGate(FrameProcessor):
|
||||
break
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
await result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
@@ -216,6 +235,34 @@ async def main():
|
||||
|
||||
# This is the regular LLM.
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
# Register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
|
||||
|
||||
tools = [
|
||||
ChatCompletionToolParam(
|
||||
type="function",
|
||||
function={
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
messages = [
|
||||
{
|
||||
@@ -224,7 +271,7 @@ async def main():
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'YES' if it thinks the user
|
||||
@@ -252,7 +299,9 @@ async def main():
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
|
||||
|
||||
bot_output_gate = OutputGate(notifier=notifier)
|
||||
# We start with the gate open because we send an initial context frame
|
||||
# to start the conversation.
|
||||
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
|
||||
|
||||
async def block_user_stopped_speaking(frame):
|
||||
return not isinstance(frame, UserStoppedSpeakingFrame)
|
||||
@@ -263,6 +312,8 @@ async def main():
|
||||
or isinstance(frame, LLMMessagesFrame)
|
||||
or isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, StopInterruptionFrame)
|
||||
or isinstance(frame, FunctionCallInProgressFrame)
|
||||
or isinstance(frame, FunctionCallResultFrame)
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -12,6 +12,7 @@ import time
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -19,6 +20,8 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMMessagesFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
@@ -26,6 +29,7 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TTSSpeakFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
@@ -333,9 +337,9 @@ class CompletenessCheck(FrameProcessor):
|
||||
|
||||
|
||||
class OutputGate(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._gate_open = False
|
||||
self._gate_open = start_open
|
||||
self._frames_buffer = []
|
||||
self._notifier = notifier
|
||||
|
||||
@@ -360,6 +364,11 @@ class OutputGate(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Don't block function call frames
|
||||
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Ignore frames that are not following the direction of this gate.
|
||||
if direction != FrameDirection.DOWNSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -390,6 +399,16 @@ class OutputGate(FrameProcessor):
|
||||
break
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
|
||||
await result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
@@ -426,6 +445,34 @@ async def main():
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o",
|
||||
)
|
||||
# Register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
|
||||
|
||||
tools = [
|
||||
ChatCompletionToolParam(
|
||||
type="function",
|
||||
function={
|
||||
"name": "get_current_weather",
|
||||
"description": "Get the current weather",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the users location.",
|
||||
},
|
||||
},
|
||||
"required": ["location", "format"],
|
||||
},
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
messages = [
|
||||
{
|
||||
@@ -434,7 +481,7 @@ async def main():
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'YES' if it thinks the user
|
||||
@@ -461,7 +508,9 @@ async def main():
|
||||
# sentence, this will wake up the notifier if that happens.
|
||||
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
|
||||
|
||||
bot_output_gate = OutputGate(notifier=notifier)
|
||||
# We start with the gate open because we send an initial context frame
|
||||
# to start the conversation.
|
||||
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
|
||||
|
||||
async def block_user_stopped_speaking(frame):
|
||||
return not isinstance(frame, UserStoppedSpeakingFrame)
|
||||
@@ -472,6 +521,8 @@ async def main():
|
||||
or isinstance(frame, LLMMessagesFrame)
|
||||
or isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, StopInterruptionFrame)
|
||||
or isinstance(frame, FunctionCallInProgressFrame)
|
||||
or isinstance(frame, FunctionCallResultFrame)
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -20,6 +20,8 @@ from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
@@ -55,13 +57,9 @@ load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
# TRANSCRIBER_MODEL = "gemini-1.5-flash-latest"
|
||||
# CLASSIFIER_MODEL = "gemini-1.5-flash-latest"
|
||||
# CONVERSATION_MODEL = "gemini-1.5-flash-latest"
|
||||
|
||||
TRANSCRIBER_MODEL = "gemini-2.0-flash-exp"
|
||||
CLASSIFIER_MODEL = "gemini-2.0-flash-exp"
|
||||
CONVERSATION_MODEL = "gemini-2.0-flash-exp"
|
||||
TRANSCRIBER_MODEL = "gemini-2.0-flash-001"
|
||||
CLASSIFIER_MODEL = "gemini-2.0-flash-001"
|
||||
CONVERSATION_MODEL = "gemini-2.0-flash-001"
|
||||
|
||||
transcriber_system_instruction = """You are an audio transcriber. You are receiving audio from a user. Your job is to
|
||||
transcribe the input audio to text exactly as it was said by the user.
|
||||
@@ -579,6 +577,11 @@ class OutputGate(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Don't block function call frames
|
||||
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Ignore frames that are not following the direction of this gate.
|
||||
if direction != FrameDirection.DOWNSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -639,7 +642,6 @@ async def main():
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
audio_in_sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -677,12 +679,6 @@ async def main():
|
||||
context = OpenAILLMContext()
|
||||
context_aggregator = conversation_llm.create_context_aggregator(context)
|
||||
|
||||
# We have instructed the LLM to return 'True' if it thinks the user
|
||||
# completed a sentence. So, if it's 'True' we will return true in this
|
||||
# predicate which will wake up the notifier.
|
||||
async def wake_check_filter(frame):
|
||||
return frame.text == "True"
|
||||
|
||||
# This is a notifier that we use to synchronize the two LLMs.
|
||||
notifier = EventNotifier()
|
||||
|
||||
@@ -699,14 +695,6 @@ async def main():
|
||||
async def block_user_stopped_speaking(frame):
|
||||
return not isinstance(frame, UserStoppedSpeakingFrame)
|
||||
|
||||
async def pass_only_llm_trigger_frames(frame):
|
||||
return (
|
||||
isinstance(frame, OpenAILLMContextFrame)
|
||||
or isinstance(frame, LLMMessagesFrame)
|
||||
or isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, StopInterruptionFrame)
|
||||
)
|
||||
|
||||
conversation_audio_context_assembler = ConversationAudioContextAssembler(context=context)
|
||||
|
||||
user_aggregator_buffer = UserAggregatorBuffer()
|
||||
|
||||
@@ -292,7 +292,7 @@ async def main():
|
||||
|
||||
conversation_llm = GoogleLLMService(
|
||||
name="Conversation",
|
||||
model="gemini-1.5-flash-latest",
|
||||
model="gemini-2.0-flash-001",
|
||||
# model="gemini-exp-1121",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
# we can give the GoogleLLMService a system instruction to use directly
|
||||
@@ -303,7 +303,7 @@ async def main():
|
||||
|
||||
input_transcription_llm = GoogleLLMService(
|
||||
name="Transcription",
|
||||
model="gemini-1.5-flash-latest",
|
||||
model="gemini-2.0-flash-001",
|
||||
# model="gemini-exp-1121",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=transcriber_system_message,
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -84,8 +84,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -37,8 +37,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
@@ -47,8 +45,6 @@ async def main():
|
||||
# matter because we can only use the Multimodal Live API's phrase
|
||||
# endpointing, for now.
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
|
||||
start_audio_paused=True,
|
||||
start_video_paused=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -52,8 +52,6 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
|
||||
@@ -38,8 +38,6 @@ load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
DESIRED_SAMPLE_RATE = 16000
|
||||
|
||||
|
||||
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
|
||||
token = api.AccessToken(api_key, api_secret)
|
||||
@@ -114,11 +112,8 @@ async def main():
|
||||
token=token,
|
||||
room_name=room_name,
|
||||
params=LiveKitParams(
|
||||
audio_in_channels=1,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
audio_in_sample_rate=DESIRED_SAMPLE_RATE,
|
||||
audio_out_sample_rate=DESIRED_SAMPLE_RATE,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
@@ -128,7 +123,6 @@ async def main():
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
live_options=LiveOptions(
|
||||
sample_rate=DESIRED_SAMPLE_RATE,
|
||||
vad_events=True,
|
||||
),
|
||||
)
|
||||
@@ -138,7 +132,6 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
sample_rate=DESIRED_SAMPLE_RATE,
|
||||
)
|
||||
|
||||
messages = [
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
|
||||
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
|
||||
):
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
|
||||
@@ -106,8 +106,8 @@ class UserImageRequester(FrameProcessor):
|
||||
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
|
||||
)
|
||||
await self.push_frame(TextFrame("Describe the image in a short sentence."))
|
||||
elif isinstance(frame, UserImageRawFrame):
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class TextFilterProcessor(FrameProcessor):
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
|
||||
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
|
||||
):
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
|
||||
@@ -2,13 +2,14 @@ import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionToolParam
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, EndTaskFrame
|
||||
from pipecat.frames.frames import EndTaskFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -42,7 +43,7 @@ async def main(
|
||||
callId: str,
|
||||
callDomain: str,
|
||||
detect_voicemail: bool,
|
||||
dialout_number: str | None,
|
||||
dialout_number: Optional[str],
|
||||
):
|
||||
# dialin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
@@ -99,14 +100,14 @@ async def main(
|
||||
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
|
||||
|
||||
#### **Step 2: Leave a Voicemail Message**
|
||||
- Immediately say:
|
||||
- Immediately say:
|
||||
*"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."*
|
||||
- **IMMEDIATELY AFTER LEAVING THE MESSAGE, CALL `terminate_call`.**
|
||||
- **DO NOT SPEAK AFTER CALLING `terminate_call`.**
|
||||
- **FAILURE TO CALL `terminate_call` IMMEDIATELY IS A MISTAKE.**
|
||||
|
||||
#### **Step 3: If Speaking to a Human**
|
||||
- If the call is answered by a human, say:
|
||||
- If the call is answered by a human, say:
|
||||
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
|
||||
- Keep responses **brief and helpful**.
|
||||
- If the user no longer needs assistance, **call `terminate_call` immediately.**
|
||||
|
||||
161
examples/sentry-metrics/.gitignore
vendored
Normal file
161
examples/sentry-metrics/.gitignore
vendored
Normal file
@@ -0,0 +1,161 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
runpod.toml
|
||||
15
examples/sentry-metrics/Dockerfile
Normal file
15
examples/sentry-metrics/Dockerfile
Normal file
@@ -0,0 +1,15 @@
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
RUN mkdir /app
|
||||
RUN mkdir /app/assets
|
||||
RUN mkdir /app/utils
|
||||
COPY *.py /app/
|
||||
COPY requirements.txt /app/
|
||||
|
||||
|
||||
WORKDIR /app
|
||||
RUN pip3 install -r requirements.txt
|
||||
|
||||
EXPOSE 7860
|
||||
|
||||
CMD ["python3", "server.py"]
|
||||
29
examples/sentry-metrics/README.md
Normal file
29
examples/sentry-metrics/README.md
Normal file
@@ -0,0 +1,29 @@
|
||||
# Sentry Metrics
|
||||
|
||||
This app connects you to a chatbot powered by GPT-4. It provides TTFB (Time-To-First-Byte) and processing metrics to Sentry.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
|
||||
cp env.example .env # and add your credentials
|
||||
|
||||
```
|
||||
|
||||
## Run the server
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
```
|
||||
docker build -t chatbot .
|
||||
docker run --env-file .env -p 7860:7860 chatbot
|
||||
```
|
||||
112
examples/sentry-metrics/bot.py
Normal file
112
examples/sentry-metrics/bot.py
Normal file
@@ -0,0 +1,112 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
import sentry_sdk
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.metrics.sentry import SentryMetrics
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_in_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Initialize Sentry
|
||||
sentry_sdk.init(
|
||||
dsn="your-project-dsn",
|
||||
traces_sample_rate=1.0,
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="cgSgspJ2msm6clMCkdW9",
|
||||
metrics=SentryMetrics(),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o",
|
||||
metrics=SentryMetrics(),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # microphone
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(allow_interruptions=True, enable_metrics=True),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
print(f"Participant left: {participant}")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
4
examples/sentry-metrics/env.example
Normal file
4
examples/sentry-metrics/env.example
Normal file
@@ -0,0 +1,4 @@
|
||||
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
|
||||
DAILY_API_KEY=7df...
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
ELEVENLABS_API_KEY=aeb...
|
||||
4
examples/sentry-metrics/requirements.txt
Normal file
4
examples/sentry-metrics/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
pipecat-ai[daily,openai,sentry,silero,elevenlabs]
|
||||
56
examples/sentry-metrics/runner.py
Normal file
56
examples/sentry-metrics/runner.py
Normal file
@@ -0,0 +1,56 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
139
examples/sentry-metrics/server.py
Normal file
139
examples/sentry-metrics/server.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
bot_procs = {}
|
||||
|
||||
daily_helpers = {}
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def cleanup():
|
||||
# Clean up function, just to be extra safe
|
||||
for entry in bot_procs.values():
|
||||
proc = entry[0]
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
aiohttp_session = aiohttp.ClientSession()
|
||||
daily_helpers["rest"] = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
yield
|
||||
await aiohttp_session.close()
|
||||
cleanup()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
print(f"!!! Room URL: {room.url}")
|
||||
# Ensure the room property is present
|
||||
if not room.url:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
|
||||
)
|
||||
|
||||
# Check if there is already an existing process running in this room
|
||||
num_bots_in_room = sum(
|
||||
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
|
||||
)
|
||||
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
|
||||
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
|
||||
|
||||
# Get the token for the room
|
||||
token = await daily_helpers["rest"].get_token(room.url)
|
||||
|
||||
if not token:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)),
|
||||
)
|
||||
bot_procs[proc.pid] = (proc, room.url)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
return RedirectResponse(room.url)
|
||||
|
||||
|
||||
@app.get("/status/{pid}")
|
||||
def get_status(pid: int):
|
||||
# Look up the subprocess
|
||||
proc = bot_procs.get(pid)
|
||||
|
||||
# If the subprocess doesn't exist, return an error
|
||||
if not proc:
|
||||
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
|
||||
|
||||
# Check the status of the subprocess
|
||||
if proc[0].poll() is None:
|
||||
status = "running"
|
||||
else:
|
||||
status = "finished"
|
||||
|
||||
return JSONResponse({"bot_id": pid, "status": status})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
|
||||
parser.add_argument("--host", type=str, default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int, default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true", help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload,
|
||||
)
|
||||
@@ -121,8 +121,6 @@ async def main():
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=24000,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
beautifulsoup4==4.12.3
|
||||
pypdf==4.3.1
|
||||
tiktoken==0.7.0
|
||||
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
|
||||
pipecat-ai[daily,cartesia,openai,silero]
|
||||
python-dotenv==1.0.1
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import aiohttp
|
||||
|
||||
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
|
||||
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
|
||||
):
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
|
||||
@@ -112,7 +112,6 @@ async def main():
|
||||
token,
|
||||
"studypal",
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
@@ -124,7 +123,6 @@ async def main():
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
|
||||
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
|
||||
sample_rate=44100,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
|
||||
@@ -155,7 +153,12 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -11,14 +11,13 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.serializers.telnyx import TelnyxFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService, Language
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.network.fastapi_websocket import (
|
||||
FastAPIWebsocketParams,
|
||||
@@ -31,7 +30,12 @@ logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encoding):
|
||||
async def run_bot(
|
||||
websocket_client,
|
||||
stream_id: str,
|
||||
outbound_encoding: str,
|
||||
inbound_encoding: str,
|
||||
):
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
@@ -48,11 +52,9 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="CwhRBWXzGAHq8TQ4Fs17",
|
||||
output_format="pcm_24000",
|
||||
params=ElevenLabsTTSService.InputParams(language=Language.EN),
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -77,7 +79,14 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000,
|
||||
audio_out_sample_rate=8000,
|
||||
allow_interruptions=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
@@ -87,7 +96,7 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
await task.queue_frames([EndFrame()])
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
|
||||
@@ -107,3 +107,34 @@ The server will start on port 8765. Keep this running while you test with Twilio
|
||||
## Usage
|
||||
|
||||
To start a call, simply make a call to your configured Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.
|
||||
|
||||
## Testing
|
||||
|
||||
It is also possible to automatically test the server without making phone calls by using a software client.
|
||||
|
||||
First, update `templates/streams.xml` to point to your server's websocket endpoint. For example:
|
||||
|
||||
```
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="ws://localhost:8765/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>
|
||||
```
|
||||
|
||||
Then, start the server with `-t` to indicate we are testing:
|
||||
|
||||
```sh
|
||||
# Make sure you’re in the project directory and your virtual environment is activated
|
||||
python server.py -t
|
||||
```
|
||||
|
||||
Finally, just point the client to the server's URL:
|
||||
|
||||
```sh
|
||||
python client.py -u http://localhost:8765 -c 2
|
||||
```
|
||||
|
||||
where `-c` allows you to create multiple concurrent clients.
|
||||
|
||||
@@ -4,10 +4,15 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
|
||||
import aiofiles
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import WebSocket
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -15,6 +20,7 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
@@ -30,10 +36,29 @@ logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def run_bot(websocket_client, stream_sid):
|
||||
async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = (
|
||||
f"{server_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
|
||||
)
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Merged audio saved to {filename}")
|
||||
else:
|
||||
logger.info("No audio data to save")
|
||||
|
||||
|
||||
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
vad_enabled=True,
|
||||
@@ -45,23 +70,28 @@ async def run_bot(websocket_client, stream_sid):
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
push_silence_after_stop=testing,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
"content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You can
|
||||
# pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Websocket input from client
|
||||
@@ -70,14 +100,22 @@ async def run_bot(websocket_client, stream_sid):
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to client
|
||||
audiobuffer, # Used to buffer the audio in the pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
# Start recording.
|
||||
await audiobuffer.start_recording()
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
@@ -86,6 +124,15 @@ async def run_bot(websocket_client, stream_sid):
|
||||
async def on_client_disconnected(transport, client):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
server_name = f"server_{websocket_client.client.port}"
|
||||
await save_audio(server_name, audio, sample_rate, num_channels)
|
||||
|
||||
# We use `handle_sigint=False` because `uvicorn` is controlling keyboard
|
||||
# interruptions. We use `force_gc=True` to force garbage collection after
|
||||
# the runner finishes running a task which could be useful for long running
|
||||
# applications with multiple clients connecting.
|
||||
runner = PipelineRunner(handle_sigint=False, force_gc=True)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
199
examples/twilio-chatbot/client.py
Normal file
199
examples/twilio-chatbot/client.py
Normal file
@@ -0,0 +1,199 @@
|
||||
#
|
||||
# Copyright (c) 2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
import xml.etree.ElementTree as ET
|
||||
from uuid import uuid4
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import EndFrame, TransportMessageUrgentFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.network.websocket_client import (
|
||||
WebsocketClientParams,
|
||||
WebsocketClientTransport,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
DEFAULT_CLIENT_DURATION = 30
|
||||
|
||||
|
||||
async def download_twiml(server_url: str) -> str:
|
||||
# TODO(aleix): add error checking.
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(server_url) as response:
|
||||
return await response.text()
|
||||
|
||||
|
||||
def get_stream_url_from_twiml(twiml: str) -> str:
|
||||
root = ET.fromstring(twiml)
|
||||
# TODO(aleix): add error checking.
|
||||
stream_element = root.find(".//Stream") # Finds the first <Stream> element
|
||||
url = stream_element.get("url")
|
||||
return url
|
||||
|
||||
|
||||
async def save_audio(client_name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = (
|
||||
f"{client_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
|
||||
)
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Merged audio saved to {filename}")
|
||||
else:
|
||||
logger.info("No audio data to save")
|
||||
|
||||
|
||||
async def run_client(client_name: str, server_url: str, duration_secs: int):
|
||||
twiml = await download_twiml(server_url)
|
||||
|
||||
stream_url = get_stream_url_from_twiml(twiml)
|
||||
|
||||
stream_sid = str(uuid4())
|
||||
|
||||
transport = WebsocketClientTransport(
|
||||
uri=stream_url,
|
||||
params=WebsocketClientParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
serializer=TwilioFrameSerializer(stream_sid),
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
# We let the audio passthrough so we can record the conversation.
|
||||
stt = DeepgramSTTService(
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
audio_passthrough=True,
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="e13cae5c-ec59-4f71-b0a6-266df3c9bb8e", # Madame Mischief
|
||||
push_silence_after_stop=True,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are an 8 year old child. A teacher will explain you new concepts you want to know about. Feel free to change topics whnever you want. Once you are taught something you need to keep asking for clarifications if you think someone your age would not understand what you are being taught.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
# NOTE: Watch out! This will save all the conversation in memory. You can
|
||||
# pass `buffer_size` to get periodic callbacks.
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Websocket input from server
|
||||
stt, # Speech-To-Text
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to server
|
||||
audiobuffer, # Used to buffer the audio in the pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_connected")
|
||||
async def on_connected(transport: WebsocketClientTransport, client):
|
||||
# Start recording.
|
||||
await audiobuffer.start_recording()
|
||||
|
||||
message = TransportMessageUrgentFrame(
|
||||
message={"event": "connected", "protocol": "Call", "version": "1.0.0"}
|
||||
)
|
||||
await transport.output().send_message(message)
|
||||
|
||||
message = TransportMessageUrgentFrame(
|
||||
message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}}
|
||||
)
|
||||
await transport.output().send_message(message)
|
||||
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
await save_audio(client_name, audio, sample_rate, num_channels)
|
||||
|
||||
async def end_call():
|
||||
await asyncio.sleep(duration_secs)
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await asyncio.gather(runner.run(task), end_call())
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Client")
|
||||
parser.add_argument("-u", "--url", type=str, required=True, help="specify the server URL")
|
||||
parser.add_argument(
|
||||
"-c", "--clients", type=int, required=True, help="number of concurrent clients"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--duration",
|
||||
type=int,
|
||||
default=DEFAULT_CLIENT_DURATION,
|
||||
help=f"duration of each client in seconds (default: {DEFAULT_CLIENT_DURATION})",
|
||||
)
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
clients = []
|
||||
for i in range(args.clients):
|
||||
clients.append(asyncio.create_task(run_client(f"client_{i}", args.url, args.duration)))
|
||||
await asyncio.gather(*clients)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -4,6 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
import uvicorn
|
||||
@@ -38,8 +39,16 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
print(call_data, flush=True)
|
||||
stream_sid = call_data["start"]["streamSid"]
|
||||
print("WebSocket connection accepted")
|
||||
await run_bot(websocket, stream_sid)
|
||||
await run_bot(websocket, stream_sid, app.state.testing)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Server")
|
||||
parser.add_argument(
|
||||
"-t", "--test", action="store_true", default=False, help="set the server in testing mode"
|
||||
)
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
app.state.testing = args.test
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=8765)
|
||||
|
||||
@@ -17,6 +17,7 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.serializers.protobuf import ProtobufFrameSerializer
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
@@ -80,7 +81,7 @@ class SessionTimeoutHandler:
|
||||
async def main():
|
||||
transport = WebsocketServerTransport(
|
||||
params=WebsocketServerParams(
|
||||
audio_out_sample_rate=16000,
|
||||
serializer=ProtobufFrameSerializer(),
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=True,
|
||||
vad_enabled=True,
|
||||
@@ -97,7 +98,6 @@ async def main():
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
sample_rate=16000,
|
||||
)
|
||||
|
||||
messages = [
|
||||
@@ -122,7 +122,12 @@ async def main():
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000, audio_out_sample_rate=16000, allow_interruptions=True
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
|
||||
@@ -32,6 +32,7 @@ dependencies = [
|
||||
"protobuf~=5.29.3",
|
||||
"pydantic~=2.10.5",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0"
|
||||
]
|
||||
|
||||
@@ -40,7 +41,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
|
||||
Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
anthropic = [ "anthropic~=0.39.0" ]
|
||||
anthropic = [ "anthropic~=0.45.2" ]
|
||||
assemblyai = [ "assemblyai~=0.36.0" ]
|
||||
aws = [ "boto3~=1.35.99" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
|
||||
@@ -69,9 +70,10 @@ moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
|
||||
nim = [ "openai~=1.59.6" ]
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
|
||||
openpipe = [ "openpipe~=4.43.0" ]
|
||||
openpipe = [ "openpipe~=4.45.0" ]
|
||||
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
|
||||
riva = [ "nvidia-riva-client~=2.18.0" ]
|
||||
sentry = [ "sentry-sdk~=2.20.0" ]
|
||||
silero = [ "onnxruntime~=1.20.1" ]
|
||||
simli = [ "simli-ai~=0.1.10"]
|
||||
soundfile = [ "soundfile~=0.13.0" ]
|
||||
|
||||
@@ -20,6 +20,22 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class KrispProcessorManager:
|
||||
"""
|
||||
Ensures that only one KrispAudioProcessor instance exists for the entire program.
|
||||
"""
|
||||
|
||||
_krisp_instance = None
|
||||
|
||||
@classmethod
|
||||
def get_processor(cls, sample_rate: int, sample_type: str, channels: int, model_path: str):
|
||||
if cls._krisp_instance is None:
|
||||
cls._krisp_instance = KrispAudioProcessor(
|
||||
sample_rate, sample_type, channels, model_path
|
||||
)
|
||||
return cls._krisp_instance
|
||||
|
||||
|
||||
class KrispFilter(BaseAudioFilter):
|
||||
def __init__(
|
||||
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
|
||||
@@ -48,7 +64,7 @@ class KrispFilter(BaseAudioFilter):
|
||||
|
||||
async def start(self, sample_rate: int):
|
||||
self._sample_rate = sample_rate
|
||||
self._krisp_processor = KrispAudioProcessor(
|
||||
self._krisp_processor = KrispProcessorManager.get_processor(
|
||||
self._sample_rate, self._sample_type, self._channels, self._model_path
|
||||
)
|
||||
|
||||
|
||||
1
src/pipecat/audio/resamplers/__init__.py
Normal file
1
src/pipecat/audio/resamplers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
30
src/pipecat/audio/resamplers/base_audio_resampler.py
Normal file
30
src/pipecat/audio/resamplers/base_audio_resampler.py
Normal file
@@ -0,0 +1,30 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseAudioResampler(ABC):
|
||||
"""Abstract base class for audio resampling. This class defines an
|
||||
interface for audio resampling implementations.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
"""
|
||||
Resamples the given audio data to a different sample rate.
|
||||
|
||||
This is an abstract method that must be implemented in subclasses.
|
||||
|
||||
Parameters:
|
||||
audio (bytes): The audio data to be resampled, represented as a byte string.
|
||||
in_rate (int): The original sample rate of the audio data (in Hz).
|
||||
out_rate (int): The desired sample rate for the resampled audio data (in Hz).
|
||||
|
||||
Returns:
|
||||
bytes: The resampled audio data as a byte string.
|
||||
"""
|
||||
pass
|
||||
25
src/pipecat/audio/resamplers/resampy_resampler.py
Normal file
25
src/pipecat/audio/resamplers/resampy_resampler.py
Normal file
@@ -0,0 +1,25 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
import resampy
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
|
||||
|
||||
class ResampyResampler(BaseAudioResampler):
|
||||
"""Audio resampler implementation using the resampy library."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
if in_rate == out_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
resampled_audio = resampy.resample(audio_data, in_rate, out_rate, filter="kaiser_fast")
|
||||
result = resampled_audio.astype(np.int16).tobytes()
|
||||
return result
|
||||
25
src/pipecat/audio/resamplers/soxr_resampler.py
Normal file
25
src/pipecat/audio/resamplers/soxr_resampler.py
Normal file
@@ -0,0 +1,25 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import numpy as np
|
||||
import soxr
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
|
||||
|
||||
class SOXRAudioResampler(BaseAudioResampler):
|
||||
"""Audio resampler implementation using the SoX resampler library."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
|
||||
if in_rate == out_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
resampled_audio = soxr.resample(audio_data, in_rate, out_rate, quality="VHQ")
|
||||
result = resampled_audio.astype(np.int16).tobytes()
|
||||
return result
|
||||
@@ -10,8 +10,24 @@ import numpy as np
|
||||
import pyloudnorm as pyln
|
||||
import soxr
|
||||
|
||||
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
|
||||
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
|
||||
|
||||
|
||||
def create_default_resampler(**kwargs) -> BaseAudioResampler:
|
||||
return SOXRAudioResampler(**kwargs)
|
||||
|
||||
|
||||
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"'resample_audio()' is deprecated, use 'create_default_resampler()' instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
if original_rate == target_rate:
|
||||
return audio
|
||||
audio_data = np.frombuffer(audio, dtype=np.int16)
|
||||
@@ -75,41 +91,45 @@ def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
|
||||
return prev_value + factor * (value - prev_value)
|
||||
|
||||
|
||||
def ulaw_to_pcm(ulaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def ulaw_to_pcm(
|
||||
ulaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
|
||||
):
|
||||
# Convert μ-law to PCM
|
||||
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
|
||||
|
||||
# Resample
|
||||
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
|
||||
|
||||
return out_pcm_bytes
|
||||
|
||||
|
||||
def pcm_to_ulaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def pcm_to_ulaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
|
||||
# Resample
|
||||
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
|
||||
|
||||
# Convert PCM to μ-law
|
||||
ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
|
||||
out_ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
|
||||
|
||||
return ulaw_bytes
|
||||
return out_ulaw_bytes
|
||||
|
||||
|
||||
def alaw_to_pcm(alaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int) -> bytes:
|
||||
async def alaw_to_pcm(
|
||||
alaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
|
||||
) -> bytes:
|
||||
# Convert a-law to PCM
|
||||
in_pcm_bytes = audioop.alaw2lin(alaw_bytes, 2)
|
||||
|
||||
# Resample
|
||||
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
|
||||
|
||||
return out_pcm_bytes
|
||||
|
||||
|
||||
def pcm_to_alaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
|
||||
async def pcm_to_alaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
|
||||
# Resample
|
||||
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
|
||||
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
|
||||
|
||||
# Convert PCM to μ-law
|
||||
alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
|
||||
out_alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
|
||||
|
||||
return alaw_bytes
|
||||
return out_alaw_bytes
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
@@ -104,11 +105,8 @@ class SileroOnnxModel:
|
||||
|
||||
|
||||
class SileroVADAnalyzer(VADAnalyzer):
|
||||
def __init__(self, *, sample_rate: int = 16000, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
|
||||
|
||||
if sample_rate != 16000 and sample_rate != 8000:
|
||||
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, params=params)
|
||||
|
||||
logger.debug("Loading Silero VAD model...")
|
||||
|
||||
@@ -138,6 +136,12 @@ class SileroVADAnalyzer(VADAnalyzer):
|
||||
# VADAnalyzer
|
||||
#
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
if sample_rate != 16000 and sample_rate != 8000:
|
||||
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
|
||||
super().set_sample_rate(sample_rate)
|
||||
|
||||
def num_frames_required(self) -> int:
|
||||
return 512 if self.sample_rate == 16000 else 256
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
from abc import abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -33,11 +34,11 @@ class VADParams(BaseModel):
|
||||
|
||||
|
||||
class VADAnalyzer:
|
||||
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
|
||||
self._sample_rate = sample_rate
|
||||
self._num_channels = num_channels
|
||||
|
||||
self.set_params(params)
|
||||
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams):
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._params = params
|
||||
self._num_channels = 1
|
||||
|
||||
self._vad_buffer = b""
|
||||
|
||||
@@ -65,13 +66,17 @@ class VADAnalyzer:
|
||||
def voice_confidence(self, buffer) -> float:
|
||||
pass
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
self._sample_rate = self._init_sample_rate or sample_rate
|
||||
self.set_params(self._params)
|
||||
|
||||
def set_params(self, params: VADParams):
|
||||
logger.info(f"Setting VAD params to: {params}")
|
||||
self._params = params
|
||||
self._vad_frames = self.num_frames_required()
|
||||
self._vad_frames_num_bytes = self._vad_frames * self._num_channels * 2
|
||||
|
||||
vad_frames_per_sec = self._vad_frames / self._sample_rate
|
||||
vad_frames_per_sec = self._vad_frames / self.sample_rate
|
||||
|
||||
self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec)
|
||||
self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec)
|
||||
@@ -80,7 +85,7 @@ class VADAnalyzer:
|
||||
self._vad_state: VADState = VADState.QUIET
|
||||
|
||||
def _get_smoothed_volume(self, audio: bytes) -> float:
|
||||
volume = calculate_audio_volume(audio, self._sample_rate)
|
||||
volume = calculate_audio_volume(audio, self.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
def analyze_audio(self, buffer) -> VADState:
|
||||
|
||||
@@ -6,7 +6,18 @@
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Literal, Mapping, Optional, Tuple
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
@@ -37,7 +48,7 @@ class KeypadEntry(str, Enum):
|
||||
STAR = "*"
|
||||
|
||||
|
||||
def format_pts(pts: int | None):
|
||||
def format_pts(pts: Optional[int]):
|
||||
return nanoseconds_to_str(pts) if pts else None
|
||||
|
||||
|
||||
@@ -48,13 +59,13 @@ class Frame:
|
||||
id: int = field(init=False)
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
metadata: dict = field(init=False)
|
||||
metadata: Dict[str, Any] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
self.metadata: dict = {}
|
||||
self.metadata: Dict[str, Any] = {}
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -115,7 +126,7 @@ class ImageRawFrame:
|
||||
|
||||
image: bytes
|
||||
size: Tuple[int, int]
|
||||
format: str | None
|
||||
format: Optional[str]
|
||||
|
||||
|
||||
#
|
||||
@@ -165,7 +176,7 @@ class URLImageRawFrame(OutputImageRawFrame):
|
||||
|
||||
"""
|
||||
|
||||
url: str | None
|
||||
url: Optional[str]
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
@@ -224,7 +235,7 @@ class TranscriptionFrame(TextFrame):
|
||||
|
||||
user_id: str
|
||||
timestamp: str
|
||||
language: Language | None = None
|
||||
language: Optional[Language] = None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
|
||||
@@ -239,7 +250,7 @@ class InterimTranscriptionFrame(TextFrame):
|
||||
text: str
|
||||
user_id: str
|
||||
timestamp: str
|
||||
language: Language | None = None
|
||||
language: Optional[Language] = None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
|
||||
@@ -261,7 +272,7 @@ class TranscriptionMessage:
|
||||
|
||||
role: Literal["user", "assistant"]
|
||||
content: str
|
||||
timestamp: str | None = None
|
||||
timestamp: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -428,11 +439,13 @@ class StartFrame(SystemFrame):
|
||||
|
||||
clock: BaseClock
|
||||
task_manager: TaskManager
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
report_only_initial_ttfb: bool = False
|
||||
observer: Optional["BaseObserver"] = None
|
||||
report_only_initial_ttfb: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -661,7 +674,7 @@ class UserImageRawFrame(InputImageRawFrame):
|
||||
class VisionImageRawFrame(InputImageRawFrame):
|
||||
"""An image with an associated text to ask for a description of it."""
|
||||
|
||||
text: str | None
|
||||
text: Optional[str]
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
|
||||
@@ -19,7 +19,7 @@ class PipelineRunner:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str | None = None,
|
||||
name: Optional[str] = None,
|
||||
handle_sigint: bool = True,
|
||||
force_gc: bool = False,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncIterable, Iterable, List
|
||||
from typing import Any, AsyncIterable, Dict, Iterable, List
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
@@ -38,24 +38,48 @@ HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5
|
||||
|
||||
|
||||
class PipelineParams(BaseModel):
|
||||
"""Configuration parameters for pipeline execution.
|
||||
|
||||
Attributes:
|
||||
allow_interruptions: Whether to allow pipeline interruptions.
|
||||
audio_in_sample_rate: Input audio sample rate in Hz.
|
||||
audio_out_sample_rate: Output audio sample rate in Hz.
|
||||
enable_heartbeats: Whether to enable heartbeat monitoring.
|
||||
enable_metrics: Whether to enable metrics collection.
|
||||
enable_usage_metrics: Whether to enable usage metrics.
|
||||
heartbeats_period_secs: Period between heartbeats in seconds.
|
||||
observers: List of observers for monitoring pipeline execution.
|
||||
report_only_initial_ttfb: Whether to report only initial time to first byte.
|
||||
send_initial_empty_metrics: Whether to send initial empty metrics.
|
||||
start_metadata: Additional metadata for pipeline start.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
allow_interruptions: bool = False
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
enable_heartbeats: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
report_only_initial_ttfb: bool = False
|
||||
observers: List[BaseObserver] = []
|
||||
heartbeats_period_secs: float = HEARTBEAT_SECONDS
|
||||
observers: List[BaseObserver] = []
|
||||
report_only_initial_ttfb: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
start_metadata: Dict[str, Any] = {}
|
||||
|
||||
|
||||
class PipelineTaskSource(FrameProcessor):
|
||||
"""This is the source processor that is linked at the beginning of the
|
||||
"""Source processor for pipeline tasks that handles frame routing.
|
||||
|
||||
This is the source processor that is linked at the beginning of the
|
||||
pipeline given to the pipeline task. It allows us to easily push frames
|
||||
downstream to the pipeline and also receive upstream frames coming from the
|
||||
pipeline.
|
||||
|
||||
Args:
|
||||
up_queue: Queue for upstream frame processing.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, up_queue: asyncio.Queue, **kwargs):
|
||||
@@ -73,10 +97,14 @@ class PipelineTaskSource(FrameProcessor):
|
||||
|
||||
|
||||
class PipelineTaskSink(FrameProcessor):
|
||||
"""This is the sink processor that is linked at the end of the pipeline
|
||||
"""Sink processor for pipeline tasks that handles final frame processing.
|
||||
|
||||
This is the sink processor that is linked at the end of the pipeline
|
||||
given to the pipeline task. It allows us to receive downstream frames and
|
||||
act on them, for example, waiting to receive an EndFrame.
|
||||
|
||||
Args:
|
||||
down_queue: Queue for downstream frame processing.
|
||||
"""
|
||||
|
||||
def __init__(self, down_queue: asyncio.Queue, **kwargs):
|
||||
@@ -89,6 +117,14 @@ class PipelineTaskSink(FrameProcessor):
|
||||
|
||||
|
||||
class PipelineTask(BaseTask):
|
||||
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
|
||||
|
||||
Args:
|
||||
pipeline: The pipeline to execute.
|
||||
params: Configuration parameters for the pipeline.
|
||||
clock: Clock implementation for timing operations.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pipeline: BasePipeline,
|
||||
@@ -136,6 +172,11 @@ class PipelineTask(BaseTask):
|
||||
"""Returns the name of this task."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def params(self) -> PipelineParams:
|
||||
"""Returns the pipeline parameters of this task."""
|
||||
return self._params
|
||||
|
||||
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
|
||||
self._task_manager.set_event_loop(loop)
|
||||
|
||||
@@ -155,9 +196,7 @@ class PipelineTask(BaseTask):
|
||||
await self.queue_frame(EndFrame())
|
||||
|
||||
async def cancel(self):
|
||||
"""
|
||||
Stops the running pipeline immediately.
|
||||
"""
|
||||
"""Stops the running pipeline immediately."""
|
||||
logger.debug(f"Canceling pipeline task {self}")
|
||||
# Make sure everything is cleaned up downstream. This is sent
|
||||
# out-of-band from the main streaming task which is what we want since
|
||||
@@ -167,9 +206,7 @@ class PipelineTask(BaseTask):
|
||||
await self._task_manager.cancel_task(self._process_push_task)
|
||||
|
||||
async def run(self):
|
||||
"""
|
||||
Starts running the given pipeline.
|
||||
"""
|
||||
"""Starts and manages the pipeline execution until completion or cancellation."""
|
||||
if self.has_finished():
|
||||
return
|
||||
try:
|
||||
@@ -187,14 +224,18 @@ class PipelineTask(BaseTask):
|
||||
self._finished = True
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
"""
|
||||
Queue a frame to be pushed down the pipeline.
|
||||
"""Queue a single frame to be pushed down the pipeline.
|
||||
|
||||
Args:
|
||||
frame: The frame to be processed.
|
||||
"""
|
||||
await self._push_queue.put(frame)
|
||||
|
||||
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
|
||||
"""
|
||||
Queues multiple frames to be pushed down the pipeline.
|
||||
"""Queues multiple frames to be pushed down the pipeline.
|
||||
|
||||
Args:
|
||||
frames: An iterable or async iterable of frames to be processed.
|
||||
"""
|
||||
if isinstance(frames, AsyncIterable):
|
||||
async for frame in frames:
|
||||
@@ -271,11 +312,14 @@ class PipelineTask(BaseTask):
|
||||
clock=self._clock,
|
||||
task_manager=self._task_manager,
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
audio_in_sample_rate=self._params.audio_in_sample_rate,
|
||||
audio_out_sample_rate=self._params.audio_out_sample_rate,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_usage_metrics,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
observer=self._observer,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
)
|
||||
start_frame.metadata = self._params.start_metadata
|
||||
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
|
||||
@@ -337,9 +381,7 @@ class PipelineTask(BaseTask):
|
||||
self._down_queue.task_done()
|
||||
|
||||
async def _heartbeat_push_handler(self):
|
||||
"""
|
||||
This tasks pushes a heartbeat frame every heartbeat period.
|
||||
"""
|
||||
"""This tasks pushes a heartbeat frame every heartbeat period."""
|
||||
while True:
|
||||
# Don't use `queue_frame()` because if an EndFrame is queued the
|
||||
# task will just stop waiting for the pipeline to finish not
|
||||
|
||||
@@ -16,9 +16,10 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
self._start_open = start_open
|
||||
self._last_context_frame = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -31,7 +32,11 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
await self._stop()
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
self._last_context_frame = frame
|
||||
if self._start_open:
|
||||
self._start_open = False
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
self._last_context_frame = frame
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import List, Type
|
||||
from typing import List, Optional, Type
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
@@ -37,7 +37,7 @@ class LLMResponseAggregator(FrameProcessor):
|
||||
start_frame,
|
||||
end_frame,
|
||||
accumulator_frame: Type[TextFrame],
|
||||
interim_accumulator_frame: Type[TextFrame] | None = None,
|
||||
interim_accumulator_frame: Optional[Type[TextFrame]] = None,
|
||||
handle_interruptions: bool = False,
|
||||
expect_stripped_words: bool = True, # if True, need to add spaces between words
|
||||
):
|
||||
|
||||
@@ -51,7 +51,7 @@ class CustomEncoder(json.JSONEncoder):
|
||||
class OpenAILLMContext:
|
||||
def __init__(
|
||||
self,
|
||||
messages: List[ChatCompletionMessageParam] | None = None,
|
||||
messages: Optional[List[ChatCompletionMessageParam]] = None,
|
||||
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
|
||||
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN,
|
||||
):
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
@@ -50,7 +52,7 @@ class ResponseAggregator(FrameProcessor):
|
||||
start_frame,
|
||||
end_frame,
|
||||
accumulator_frame: TextFrame,
|
||||
interim_accumulator_frame: TextFrame | None = None,
|
||||
interim_accumulator_frame: Optional[TextFrame] = None,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ class AsyncGeneratorProcessor(FrameProcessor):
|
||||
if isinstance(frame, (CancelFrame, EndFrame)):
|
||||
await self._data_queue.put(None)
|
||||
else:
|
||||
data = self._serializer.serialize(frame)
|
||||
data = await self._serializer.serialize(frame)
|
||||
if data:
|
||||
await self._data_queue.put(data)
|
||||
|
||||
|
||||
@@ -4,12 +4,18 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.audio.utils import create_default_resampler, interleave_stereo_audio, mix_audio
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
StartFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
@@ -29,16 +35,29 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, *, sample_rate: int = 24000, num_channels: int = 1, buffer_size: int = 0, **kwargs
|
||||
self,
|
||||
*,
|
||||
sample_rate: Optional[int] = None,
|
||||
num_channels: int = 1,
|
||||
buffer_size: int = 0,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._sample_rate = sample_rate
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._num_channels = num_channels
|
||||
self._buffer_size = buffer_size
|
||||
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
|
||||
self._last_user_frame_at = 0
|
||||
self._last_bot_frame_at = 0
|
||||
|
||||
self._recording = False
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self._register_event_handler("on_audio_data")
|
||||
|
||||
@property
|
||||
@@ -64,43 +83,83 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
else:
|
||||
return b""
|
||||
|
||||
def reset_audio_buffers(self):
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
async def start_recording(self):
|
||||
self._recording = True
|
||||
self._reset_recording()
|
||||
|
||||
async def stop_recording(self):
|
||||
await self._call_on_audio_data_handler()
|
||||
self._recording = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Include all audio from the user.
|
||||
if isinstance(frame, InputAudioRawFrame):
|
||||
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
# Update output sample rate if necessary.
|
||||
if isinstance(frame, StartFrame):
|
||||
self._update_sample_rate(frame)
|
||||
|
||||
if self._recording and isinstance(frame, InputAudioRawFrame):
|
||||
# Add silence if we need to.
|
||||
silence = self._compute_silence(self._last_user_frame_at)
|
||||
self._user_audio_buffer.extend(silence)
|
||||
# Add user audio.
|
||||
resampled = await self._resample_audio(frame)
|
||||
self._user_audio_buffer.extend(resampled)
|
||||
# Sync the bot's buffer to the user's buffer by adding silence if needed
|
||||
if len(self._user_audio_buffer) > len(self._bot_audio_buffer):
|
||||
silence = b"\x00" * len(resampled)
|
||||
self._bot_audio_buffer.extend(silence)
|
||||
# If the bot is speaking, include all audio from the bot.
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
# Save time of frame so we can compute silence.
|
||||
self._last_user_frame_at = time.time()
|
||||
elif self._recording and isinstance(frame, OutputAudioRawFrame):
|
||||
# Add silence if we need to.
|
||||
silence = self._compute_silence(self._last_bot_frame_at)
|
||||
self._bot_audio_buffer.extend(silence)
|
||||
# Add bot audio.
|
||||
resampled = await self._resample_audio(frame)
|
||||
self._bot_audio_buffer.extend(resampled)
|
||||
# Save time of frame so we can compute silence.
|
||||
self._last_bot_frame_at = time.time()
|
||||
|
||||
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
|
||||
await self._call_on_audio_data_handler()
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._call_on_audio_data_handler()
|
||||
if isinstance(frame, (CancelFrame, EndFrame)):
|
||||
await self.stop_recording()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def _update_sample_rate(self, frame: StartFrame):
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
async def _call_on_audio_data_handler(self):
|
||||
if not self.has_audio():
|
||||
if not self.has_audio() or not self._recording:
|
||||
return
|
||||
|
||||
merged_audio = self.merge_audio_buffers()
|
||||
await self._call_event_handler(
|
||||
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
|
||||
)
|
||||
self.reset_audio_buffers()
|
||||
self._reset_audio_buffers()
|
||||
|
||||
def _buffer_has_audio(self, buffer: bytearray) -> bool:
|
||||
return buffer is not None and len(buffer) > 0
|
||||
|
||||
def _reset_recording(self):
|
||||
self._reset_audio_buffers()
|
||||
self._last_user_frame_at = time.time()
|
||||
self._last_bot_frame_at = time.time()
|
||||
|
||||
def _reset_audio_buffers(self):
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
|
||||
async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
|
||||
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)
|
||||
|
||||
def _compute_silence(self, from_time: float) -> bytes:
|
||||
quiet_time = time.time() - from_time
|
||||
# We should get audio frames very frequently. We pick 100ms because
|
||||
# that's big enough, but it could be even a bit slower since we usually
|
||||
# do 20ms audio frames.
|
||||
if from_time == 0 or quiet_time < 0.1:
|
||||
return b""
|
||||
num_bytes = int(quiet_time * self._sample_rate) * 2
|
||||
silence = b"\x00" * num_bytes
|
||||
return silence
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
@@ -11,6 +13,7 @@ from pipecat.audio.vad.vad_analyzer import VADParams, VADState
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
@@ -23,7 +26,7 @@ class SileroVAD(FrameProcessor):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: Optional[int] = None,
|
||||
vad_params: VADParams = VADParams(),
|
||||
audio_passthrough: bool = False,
|
||||
):
|
||||
@@ -41,6 +44,9 @@ class SileroVAD(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
self._vad_analyzer.set_sample_rate(frame.audio_in_sample_rate)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self._analyze_audio(frame)
|
||||
if self._audio_passthrough:
|
||||
|
||||
@@ -245,6 +245,9 @@ class FrameProcessor:
|
||||
await self.push_frame(error, FrameDirection.UPSTREAM)
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
if not self._check_ready(frame):
|
||||
return
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
else:
|
||||
@@ -319,6 +322,16 @@ class FrameProcessor:
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
raise
|
||||
|
||||
def _check_ready(self, frame: Frame):
|
||||
# If we are trying to push a frame but we still have no clock, it means
|
||||
# we didn't process a StartFrame.
|
||||
if not self._clock:
|
||||
logger.error(
|
||||
f"{self} not properly initialized, missing super().process_frame(frame, direction)?"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
def __create_input_task(self):
|
||||
if not self.__input_frame_task:
|
||||
self.__should_block_frames = False
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from typing import Union
|
||||
from typing import Optional, Union
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -30,7 +30,7 @@ class LangchainProcessor(FrameProcessor):
|
||||
super().__init__()
|
||||
self._chain = chain
|
||||
self._transcript_key = transcript_key
|
||||
self._participant_id: str | None = None
|
||||
self._participant_id: Optional[str] = None
|
||||
|
||||
def set_participant_id(self, participant_id: str):
|
||||
self._participant_id = participant_id
|
||||
|
||||
@@ -753,7 +753,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
super().__init__(**kwargs)
|
||||
self._config = config
|
||||
|
||||
self._pipeline: FrameProcessor | None = None
|
||||
self._pipeline: Optional[FrameProcessor] = None
|
||||
|
||||
self._bot_ready = False
|
||||
self._client_ready = False
|
||||
@@ -999,7 +999,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_action(self, request_id: str | None, data: RTVIActionRun):
|
||||
async def _handle_action(self, request_id: Optional[str], data: RTVIActionRun):
|
||||
action_id = self._action_id(data.service, data.action)
|
||||
if action_id not in self._registered_actions:
|
||||
await self._send_error_response(request_id, f"Action {action_id} not registered")
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -38,7 +39,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
class OutputParams(BaseModel):
|
||||
video_width: int = 1280
|
||||
video_height: int = 720
|
||||
audio_sample_rate: int = 24000
|
||||
audio_sample_rate: Optional[int] = None
|
||||
audio_channels: int = 1
|
||||
clock_sync: bool = True
|
||||
|
||||
@@ -46,6 +47,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._out_params = out_params
|
||||
self._sample_rate = 0
|
||||
|
||||
Gst.init()
|
||||
|
||||
@@ -90,6 +92,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _start(self, frame: StartFrame):
|
||||
self._sample_rate = self._out_params.audio_sample_rate or frame.audio_out_sample_rate
|
||||
self._player.set_state(Gst.State.PLAYING)
|
||||
|
||||
async def _stop(self, frame: EndFrame):
|
||||
@@ -122,7 +125,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
audioresample = Gst.ElementFactory.make("audioresample", None)
|
||||
audiocapsfilter = Gst.ElementFactory.make("capsfilter", None)
|
||||
audiocaps = Gst.Caps.from_string(
|
||||
f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
|
||||
f"audio/x-raw,format=S16LE,rate={self._sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
|
||||
)
|
||||
audiocapsfilter.set_property("caps", audiocaps)
|
||||
appsink_audio = Gst.ElementFactory.make("appsink", None)
|
||||
@@ -188,7 +191,7 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
(_, info) = buffer.map(Gst.MapFlags.READ)
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=info.data,
|
||||
sample_rate=self._out_params.audio_sample_rate,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._out_params.audio_channels,
|
||||
)
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
|
||||
|
||||
@@ -4,19 +4,14 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
import sentry_sdk
|
||||
|
||||
sentry_available = sentry_sdk.is_initialized()
|
||||
if not sentry_available:
|
||||
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
|
||||
except ImportError:
|
||||
sentry_available = False
|
||||
logger.warning("Sentry SDK not installed. Sentry features will be disabled.")
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use Sentry, you need to `pip install pipecat-ai[sentry]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
|
||||
|
||||
@@ -24,41 +19,44 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMet
|
||||
class SentryMetrics(FrameProcessorMetrics):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._ttfb_metrics_span = None
|
||||
self._processing_metrics_span = None
|
||||
self._ttfb_metrics_tx = None
|
||||
self._processing_metrics_tx = None
|
||||
self._sentry_available = sentry_sdk.is_initialized()
|
||||
if not self._sentry_available:
|
||||
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
|
||||
|
||||
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
||||
if self._should_report_ttfb:
|
||||
self._start_ttfb_time = time.time()
|
||||
if sentry_available:
|
||||
self._ttfb_metrics_span = sentry_sdk.start_span(
|
||||
op="ttfb",
|
||||
description=f"TTFB for {self._processor_name()}",
|
||||
start_timestamp=self._start_ttfb_time,
|
||||
)
|
||||
logger.debug(
|
||||
f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started."
|
||||
)
|
||||
self._should_report_ttfb = not report_only_initial_ttfb
|
||||
await super().start_ttfb_metrics(report_only_initial_ttfb)
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
stop_time = time.time()
|
||||
if sentry_available:
|
||||
self._ttfb_metrics_span.finish(end_timestamp=stop_time)
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
self._start_processing_time = time.time()
|
||||
if sentry_available:
|
||||
self._processing_metrics_span = sentry_sdk.start_span(
|
||||
op="processing",
|
||||
description=f"Processing for {self._processor_name()}",
|
||||
start_timestamp=self._start_processing_time,
|
||||
if self._should_report_ttfb and self._sentry_available:
|
||||
self._ttfb_metrics_tx = sentry_sdk.start_transaction(
|
||||
op="ttfb",
|
||||
name=f"TTFB for {self._processor_name()}",
|
||||
)
|
||||
logger.debug(
|
||||
f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started."
|
||||
f"Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
|
||||
)
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
await super().stop_ttfb_metrics()
|
||||
|
||||
if self._sentry_available and self._ttfb_metrics_tx:
|
||||
self._ttfb_metrics_tx.finish()
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
await super().start_processing_metrics()
|
||||
|
||||
if self._sentry_available:
|
||||
self._processing_metrics_tx = sentry_sdk.start_transaction(
|
||||
op="processing",
|
||||
name=f"Processing for {self._processor_name()}",
|
||||
)
|
||||
logger.debug(
|
||||
f"Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
|
||||
)
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
stop_time = time.time()
|
||||
if sentry_available:
|
||||
self._processing_metrics_span.finish(end_timestamp=stop_time)
|
||||
await super().stop_processing_metrics()
|
||||
|
||||
if self._sentry_available and self._processing_metrics_tx:
|
||||
self._processing_metrics_tx.finish()
|
||||
|
||||
@@ -87,7 +87,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
|
||||
"""Initialize processor with aggregation state."""
|
||||
super().__init__(**kwargs)
|
||||
self._current_text_parts: List[str] = []
|
||||
self._aggregation_start_time: Optional[str] | None = None
|
||||
self._aggregation_start_time: Optional[str] = None
|
||||
|
||||
async def _emit_aggregated_text(self):
|
||||
"""Emit aggregated text as a transcript message."""
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.frames.frames import Frame, StartFrame
|
||||
|
||||
|
||||
class FrameSerializerType(Enum):
|
||||
@@ -21,10 +21,13 @@ class FrameSerializer(ABC):
|
||||
def type(self) -> FrameSerializerType:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
pass
|
||||
|
||||
@@ -25,7 +25,7 @@ class LivekitFrameSerializer(FrameSerializer):
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.BINARY
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if not isinstance(frame, OutputAudioRawFrame):
|
||||
return None
|
||||
audio_frame = AudioFrame(
|
||||
@@ -36,7 +36,7 @@ class LivekitFrameSerializer(FrameSerializer):
|
||||
)
|
||||
return pickle.dumps(audio_frame)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
audio_frame: AudioFrame = pickle.loads(data)["frame"]
|
||||
return InputAudioRawFrame(
|
||||
audio=bytes(audio_frame.data),
|
||||
|
||||
@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.BINARY
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
proto_frame = frame_protos.Frame()
|
||||
if type(frame) not in self.SERIALIZABLE_TYPES:
|
||||
logger.warning(f"Frame type {type(frame)} is not serializable")
|
||||
@@ -57,26 +57,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
|
||||
return proto_frame.SerializeToString()
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Returns a Frame object from a Frame protobuf.
|
||||
|
||||
Used to convert frames
|
||||
passed over the wire as protobufs to Frame objects used in pipelines
|
||||
and frame processors.
|
||||
|
||||
>>> serializer = ProtobufFrameSerializer()
|
||||
>>> serializer.deserialize(
|
||||
... serializer.serialize(OutputAudioFrame(data=b'1234567890')))
|
||||
InputAudioFrame(data=b'1234567890')
|
||||
|
||||
>>> serializer.deserialize(
|
||||
... serializer.serialize(TextFrame(text='hello world')))
|
||||
TextFrame(text='hello world')
|
||||
|
||||
>>> serializer.deserialize(serializer.serialize(TranscriptionFrame(
|
||||
... text="Hello there!", participantId="123", timestamp="2021-01-01")))
|
||||
TranscriptionFrame(text='Hello there!', participantId='123', timestamp='2021-01-01')
|
||||
"""
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
proto = frame_protos.Frame.FromString(data)
|
||||
which = proto.WhichOneof("frame")
|
||||
if which not in self.DESERIALIZABLE_FIELDS:
|
||||
|
||||
@@ -6,16 +6,24 @@
|
||||
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import alaw_to_pcm, pcm_to_alaw, pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.audio.utils import (
|
||||
alaw_to_pcm,
|
||||
create_default_resampler,
|
||||
pcm_to_alaw,
|
||||
pcm_to_ulaw,
|
||||
ulaw_to_pcm,
|
||||
)
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
KeypadEntry,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
|
||||
@@ -23,8 +31,8 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
|
||||
class TelnyxFrameSerializer(FrameSerializer):
|
||||
class InputParams(BaseModel):
|
||||
telnyx_sample_rate: int = 8000
|
||||
sample_rate: int = 16000
|
||||
telnyx_sample_rate: int = 8000 # Default Telnyx rate (8kHz)
|
||||
sample_rate: Optional[int] = None # Pipeline input rate
|
||||
inbound_encoding: str = "PCMU"
|
||||
outbound_encoding: str = "PCMU"
|
||||
|
||||
@@ -40,21 +48,30 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
params.inbound_encoding = inbound_encoding
|
||||
self._params = params
|
||||
|
||||
self._telnyx_sample_rate = self._params.telnyx_sample_rate
|
||||
self._sample_rate = 0 # Pipeline input rate
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.TEXT
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
# Output: Convert PCM at frame's rate to 8kHz encoded for Telnyx
|
||||
if self._params.inbound_encoding == "PCMU":
|
||||
serialized_data = pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._params.telnyx_sample_rate
|
||||
serialized_data = await pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
|
||||
)
|
||||
elif self._params.inbound_encoding == "PCMA":
|
||||
serialized_data = pcm_to_alaw(
|
||||
data, frame.sample_rate, self._params.telnyx_sample_rate
|
||||
serialized_data = await pcm_to_alaw(
|
||||
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
|
||||
@@ -71,26 +88,33 @@ class TelnyxFrameSerializer(FrameSerializer):
|
||||
answer = {"event": "clear"}
|
||||
return json.dumps(answer)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
if message["event"] == "media":
|
||||
payload_base64 = message["media"]["payload"]
|
||||
payload = base64.b64decode(payload_base64)
|
||||
|
||||
# Input: Convert Telnyx's 8kHz encoded audio to PCM at pipeline input rate
|
||||
if self._params.outbound_encoding == "PCMU":
|
||||
deserialized_data = ulaw_to_pcm(
|
||||
payload, self._params.telnyx_sample_rate, self._params.sample_rate
|
||||
deserialized_data = await ulaw_to_pcm(
|
||||
payload,
|
||||
self._telnyx_sample_rate,
|
||||
self._sample_rate,
|
||||
self._resampler,
|
||||
)
|
||||
elif self._params.outbound_encoding == "PCMA":
|
||||
deserialized_data = alaw_to_pcm(
|
||||
payload, self._params.telnyx_sample_rate, self._params.sample_rate
|
||||
deserialized_data = await alaw_to_pcm(
|
||||
payload,
|
||||
self._telnyx_sample_rate,
|
||||
self._sample_rate,
|
||||
self._resampler,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")
|
||||
|
||||
audio_frame = InputAudioRawFrame(
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
|
||||
)
|
||||
return audio_frame
|
||||
elif message["event"] == "dtmf":
|
||||
|
||||
@@ -6,16 +6,18 @@
|
||||
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
InputDTMFFrame,
|
||||
KeypadEntry,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
@@ -25,25 +27,36 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
|
||||
class TwilioFrameSerializer(FrameSerializer):
|
||||
class InputParams(BaseModel):
|
||||
twilio_sample_rate: int = 8000
|
||||
sample_rate: int = 16000
|
||||
twilio_sample_rate: int = 8000 # Default Twilio rate (8kHz)
|
||||
sample_rate: Optional[int] = None # Pipeline input rate
|
||||
|
||||
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
|
||||
self._stream_sid = stream_sid
|
||||
self._params = params
|
||||
|
||||
self._twilio_sample_rate = self._params.twilio_sample_rate
|
||||
self._sample_rate = 0 # Pipeline input rate
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
@property
|
||||
def type(self) -> FrameSerializerType:
|
||||
return FrameSerializerType.TEXT
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
answer = {"event": "clear", "streamSid": self._stream_sid}
|
||||
return json.dumps(answer)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
|
||||
# Output: Convert PCM at frame's rate to 8kHz μ-law for Twilio
|
||||
serialized_data = await pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._twilio_sample_rate, self._resampler
|
||||
)
|
||||
payload = base64.b64encode(serialized_data).decode("utf-8")
|
||||
answer = {
|
||||
"event": "media",
|
||||
@@ -55,18 +68,19 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
|
||||
return json.dumps(frame.message)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
async def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
if message["event"] == "media":
|
||||
payload_base64 = message["media"]["payload"]
|
||||
payload = base64.b64decode(payload_base64)
|
||||
|
||||
deserialized_data = ulaw_to_pcm(
|
||||
payload, self._params.twilio_sample_rate, self._params.sample_rate
|
||||
# Input: Convert Twilio's 8kHz μ-law to PCM at pipeline input rate
|
||||
deserialized_data = await ulaw_to_pcm(
|
||||
payload, self._twilio_sample_rate, self._sample_rate, self._resampler
|
||||
)
|
||||
audio_frame = InputAudioRawFrame(
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
|
||||
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
|
||||
)
|
||||
return audio_frame
|
||||
elif message["event"] == "dtmf":
|
||||
|
||||
@@ -140,7 +140,7 @@ class LLMService(AIService):
|
||||
self._start_callbacks = {}
|
||||
|
||||
# TODO-CB: callback function type
|
||||
def register_function(self, function_name: str | None, callback, start_callback=None):
|
||||
def register_function(self, function_name: Optional[str], callback, start_callback=None):
|
||||
# Registering a function with the function_name set to None will run that callback
|
||||
# for all functions
|
||||
self._callbacks[function_name] = callback
|
||||
@@ -148,7 +148,7 @@ class LLMService(AIService):
|
||||
if start_callback:
|
||||
self._start_callbacks[function_name] = start_callback
|
||||
|
||||
def unregister_function(self, function_name: str | None):
|
||||
def unregister_function(self, function_name: Optional[str]):
|
||||
del self._callbacks[function_name]
|
||||
if self._start_callbacks[function_name]:
|
||||
del self._start_callbacks[function_name]
|
||||
@@ -190,7 +190,7 @@ class LLMService(AIService):
|
||||
elif None in self._start_callbacks.keys():
|
||||
return await self._start_callbacks[None](function_name, self, context)
|
||||
|
||||
async def request_image_frame(self, user_id: str, *, text_content: str | None = None):
|
||||
async def request_image_frame(self, user_id: str, *, text_content: Optional[str] = None):
|
||||
await self.push_frame(
|
||||
UserImageRequestFrame(user_id=user_id, context=text_content), FrameDirection.UPSTREAM
|
||||
)
|
||||
@@ -213,7 +213,7 @@ class TTSService(AIService):
|
||||
# if push_silence_after_stop is True, send this amount of audio silence
|
||||
silence_time_s: float = 2.0,
|
||||
# TTS output sample rate
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -224,7 +224,8 @@ class TTSService(AIService):
|
||||
self._stop_frame_timeout_s: float = stop_frame_timeout_s
|
||||
self._push_silence_after_stop: bool = push_silence_after_stop
|
||||
self._silence_time_s: float = silence_time_s
|
||||
self._sample_rate: int = sample_rate
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._voice_id: str = ""
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._text_filter: Optional[BaseTextFilter] = text_filter
|
||||
@@ -248,16 +249,20 @@ class TTSService(AIService):
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
return Language(language)
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
pass
|
||||
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return Language(language)
|
||||
|
||||
async def update_setting(self, key: str, value: Any):
|
||||
pass
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.create_task(self._stop_frame_handler())
|
||||
|
||||
@@ -347,7 +352,7 @@ class TTSService(AIService):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _process_text_frame(self, frame: TextFrame):
|
||||
text: str | None = None
|
||||
text: Optional[str] = None
|
||||
if not self._aggregate_sentences:
|
||||
text = frame.text
|
||||
else:
|
||||
@@ -467,9 +472,17 @@ class WordTTSService(TTSService):
|
||||
class STTService(AIService):
|
||||
"""STTService is a base class for speech-to-text services."""
|
||||
|
||||
def __init__(self, audio_passthrough=False, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
audio_passthrough=False,
|
||||
# STT input sample rate
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._audio_passthrough = audio_passthrough
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._muted: bool = False
|
||||
|
||||
@@ -478,6 +491,10 @@ class STTService(AIService):
|
||||
"""Returns whether the STT service is currently muted."""
|
||||
return self._muted
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@abstractmethod
|
||||
async def set_model(self, model: str):
|
||||
self.set_model_name(model)
|
||||
@@ -491,6 +508,10 @@ class STTService(AIService):
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
logger.info(f"Updating STT settings: {self._settings}")
|
||||
for key, value in settings.items():
|
||||
@@ -540,17 +561,15 @@ class SegmentedSTTService(STTService):
|
||||
min_volume: float = 0.6,
|
||||
max_silence_secs: float = 0.3,
|
||||
max_buffer_secs: float = 1.5,
|
||||
sample_rate: int = 24000,
|
||||
num_channels: int = 1,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
self._min_volume = min_volume
|
||||
self._max_silence_secs = max_silence_secs
|
||||
self._max_buffer_secs = max_buffer_secs
|
||||
self._sample_rate = sample_rate
|
||||
self._num_channels = num_channels
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
self._content = None
|
||||
self._wave = None
|
||||
self._silence_num_frames = 0
|
||||
# Volume exponential smoothing
|
||||
self._smoothing_factor = 0.2
|
||||
@@ -569,8 +588,8 @@ class SegmentedSTTService(STTService):
|
||||
|
||||
# If buffer is not empty and we have enough data or there's been a long
|
||||
# silence, transcribe the audio gathered so far.
|
||||
silence_secs = self._silence_num_frames / self._sample_rate
|
||||
buffer_secs = self._wave.getnframes() / self._sample_rate
|
||||
silence_secs = self._silence_num_frames / self.sample_rate
|
||||
buffer_secs = self._wave.getnframes() / self.sample_rate
|
||||
if self._content.tell() > 0 and (
|
||||
buffer_secs > self._max_buffer_secs or silence_secs > self._max_silence_secs
|
||||
):
|
||||
@@ -580,18 +599,24 @@ class SegmentedSTTService(STTService):
|
||||
await self.process_generator(self.run_stt(self._content.read()))
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
self._wave.close()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
self._wave.close()
|
||||
|
||||
def _new_wave(self):
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(self._num_channels)
|
||||
ww.setframerate(self._sample_rate)
|
||||
ww.setnchannels(1)
|
||||
ww.setframerate(self.sample_rate)
|
||||
return (content, ww)
|
||||
|
||||
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
|
||||
|
||||
@@ -326,9 +326,9 @@ class AnthropicLLMService(LLMService):
|
||||
class AnthropicLLMContext(OpenAILLMContext):
|
||||
def __init__(
|
||||
self,
|
||||
messages: list[dict] | None = None,
|
||||
tools: list[dict] | None = None,
|
||||
tool_choice: dict | None = None,
|
||||
messages: Optional[List[dict]] = None,
|
||||
tools: Optional[List[dict]] = None,
|
||||
tool_choice: Optional[dict] = None,
|
||||
*,
|
||||
system: Union[str, NotGiven] = NOT_GIVEN,
|
||||
):
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -38,20 +38,17 @@ class AssemblyAISTTService(STTService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
sample_rate: int = 16000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
|
||||
language=Language.EN, # Only English is supported for Realtime
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
aai.settings.api_key = api_key
|
||||
self._transcriber: aai.RealtimeTranscriber | None = None
|
||||
# Store reference to the main event loop for use in callback functions
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._transcriber: Optional[aai.RealtimeTranscriber] = None
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"encoding": encoding,
|
||||
"language": language,
|
||||
}
|
||||
@@ -121,7 +118,7 @@ class AssemblyAISTTService(STTService):
|
||||
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
# This is necessary because this callback runs in a different thread
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
|
||||
|
||||
def on_error(error: aai.RealtimeError):
|
||||
"""Callback for handling errors from AssemblyAI.
|
||||
@@ -131,14 +128,16 @@ class AssemblyAISTTService(STTService):
|
||||
"""
|
||||
logger.error(f"{self}: An error occurred: {error}")
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self.push_frame(ErrorFrame(str(error))), self.get_event_loop()
|
||||
)
|
||||
|
||||
def on_close():
|
||||
"""Callback for when the connection to AssemblyAI is closed."""
|
||||
logger.info(f"{self}: Disconnected from AssemblyAI")
|
||||
|
||||
self._transcriber = aai.RealtimeTranscriber(
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
encoding=self._settings["encoding"],
|
||||
on_data=on_data,
|
||||
on_error=on_error,
|
||||
|
||||
@@ -10,7 +10,7 @@ from typing import AsyncGenerator, Optional
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import resample_audio
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -32,7 +32,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
def language_to_aws_language(language: Language) -> str | None:
|
||||
def language_to_aws_language(language: Language) -> Optional[str]:
|
||||
language_map = {
|
||||
# Arabic
|
||||
Language.AR: "arb",
|
||||
@@ -124,7 +124,7 @@ class PollyTTSService(TTSService):
|
||||
aws_session_token: Optional[str] = None,
|
||||
region: Optional[str] = None,
|
||||
voice_id: str = "Joanna",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -138,7 +138,6 @@ class PollyTTSService(TTSService):
|
||||
region_name=region,
|
||||
)
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"engine": params.engine,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -148,12 +147,14 @@ class PollyTTSService(TTSService):
|
||||
"volume": params.volume,
|
||||
}
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self.set_voice(voice_id)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_aws_language(language)
|
||||
|
||||
def _construct_ssml(self, text: str) -> str:
|
||||
@@ -193,8 +194,7 @@ class PollyTTSService(TTSService):
|
||||
response = self._polly_client.synthesize_speech(**args)
|
||||
if "AudioStream" in response:
|
||||
audio_data = response["AudioStream"].read()
|
||||
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
|
||||
return resampled
|
||||
return audio_data
|
||||
return None
|
||||
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
@@ -225,6 +225,8 @@ class PollyTTSService(TTSService):
|
||||
yield None
|
||||
return
|
||||
|
||||
audio_data = await self._resampler.resample(audio_data, 16000, self.sample_rate)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
@@ -234,7 +236,7 @@ class PollyTTSService(TTSService):
|
||||
chunk = audio_data[i : i + chunk_size]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -57,7 +57,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
def language_to_azure_language(language: Language) -> str | None:
|
||||
def language_to_azure_language(language: Language) -> Optional[str]:
|
||||
language_map = {
|
||||
# Afrikaans
|
||||
Language.AF: "af-ZA",
|
||||
@@ -450,14 +450,13 @@ class AzureBaseTTSService(TTSService):
|
||||
api_key: str,
|
||||
region: str,
|
||||
voice="en-US-SaraNeural",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"emphasis": params.emphasis,
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -478,7 +477,7 @@ class AzureBaseTTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_azure_language(language)
|
||||
|
||||
def _construct_ssml(self, text: str) -> str:
|
||||
@@ -530,25 +529,32 @@ class AzureBaseTTSService(TTSService):
|
||||
class AzureTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._speech_config = None
|
||||
self._speech_synthesizer = None
|
||||
self._audio_queue = asyncio.Queue()
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
# Now self.sample_rate is properly initialized
|
||||
self._speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
self._speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self.sample_rate)
|
||||
)
|
||||
speech_config.set_service_property(
|
||||
self._speech_config.set_service_property(
|
||||
"synthesizer.synthesis.connection.synthesisConnectionImpl",
|
||||
"websocket",
|
||||
ServicePropertyChannel.UriQueryParameter,
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
self._speech_synthesizer = SpeechSynthesizer(
|
||||
speech_config=self._speech_config, audio_config=None
|
||||
)
|
||||
|
||||
# Set up event handlers
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
|
||||
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
|
||||
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
|
||||
@@ -591,7 +597,7 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
|
||||
yield TTSAudioRawFrame(
|
||||
audio=chunk,
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
|
||||
@@ -605,17 +611,22 @@ class AzureTTSService(AzureBaseTTSService):
|
||||
class AzureHttpTTSService(AzureBaseTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._speech_config = None
|
||||
self._speech_synthesizer = None
|
||||
|
||||
speech_config = SpeechConfig(
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._speech_config = SpeechConfig(
|
||||
subscription=self._api_key,
|
||||
region=self._region,
|
||||
speech_recognition_language=self._settings["language"],
|
||||
)
|
||||
speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self._settings["sample_rate"])
|
||||
self._speech_config.set_speech_synthesis_output_format(
|
||||
sample_rate_to_output_format(self.sample_rate)
|
||||
)
|
||||
self._speech_synthesizer = SpeechSynthesizer(
|
||||
speech_config=self._speech_config, audio_config=None
|
||||
)
|
||||
|
||||
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
@@ -633,7 +644,7 @@ class AzureHttpTTSService(AzureBaseTTSService):
|
||||
# Azure always sends a 44-byte header. Strip it off.
|
||||
yield TTSAudioRawFrame(
|
||||
audio=result.audio_data[44:],
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
yield TTSStoppedFrame()
|
||||
@@ -650,24 +661,14 @@ class AzureSTTService(STTService):
|
||||
*,
|
||||
api_key: str,
|
||||
region: str,
|
||||
language=Language.EN_US,
|
||||
sample_rate=24000,
|
||||
channels=1,
|
||||
language: Language = Language.EN_US,
|
||||
sample_rate: Optional[int] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
speech_config.speech_recognition_language = language
|
||||
|
||||
stream_format = AudioStreamFormat(samples_per_second=sample_rate, channels=channels)
|
||||
self._audio_stream = PushAudioInputStream(stream_format)
|
||||
|
||||
audio_config = AudioConfig(stream=self._audio_stream)
|
||||
self._speech_recognizer = SpeechRecognizer(
|
||||
speech_config=speech_config, audio_config=audio_config
|
||||
)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_config = SpeechConfig(subscription=api_key, region=region)
|
||||
self._speech_config.speech_recognition_language = language
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
await self.start_processing_metrics()
|
||||
@@ -677,6 +678,16 @@ class AzureSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
|
||||
self._audio_stream = PushAudioInputStream(stream_format)
|
||||
|
||||
audio_config = AudioConfig(stream=self._audio_stream)
|
||||
|
||||
self._speech_recognizer = SpeechRecognizer(
|
||||
speech_config=self._speech_config, audio_config=audio_config
|
||||
)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
self._speech_recognizer.start_continuous_recognition_async()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
|
||||
@@ -9,7 +9,7 @@ import os
|
||||
import uuid
|
||||
import wave
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Tuple
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
@@ -69,7 +69,7 @@ class CanonicalMetricsService(AIService):
|
||||
api_url: str = "https://voiceapp.canonical.chat/api/v1",
|
||||
assistant_speaks_first: bool = True,
|
||||
output_dir: str = "recordings",
|
||||
context: OpenAILLMContext | None = None,
|
||||
context: Optional[OpenAILLMContext] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
@@ -117,7 +117,6 @@ class CanonicalMetricsService(AIService):
|
||||
try:
|
||||
await self._multipart_upload(filename)
|
||||
await aiofiles.os.remove(filename)
|
||||
audio_buffer_processor.reset_audio_buffers()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except Exception as e:
|
||||
|
||||
@@ -43,7 +43,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
def language_to_cartesia_language(language: Language) -> str | None:
|
||||
def language_to_cartesia_language(language: Language) -> Optional[str]:
|
||||
BASE_LANGUAGES = {
|
||||
Language.DE: "de",
|
||||
Language.EN: "en",
|
||||
@@ -89,7 +89,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model: str = "sonic",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
@@ -121,7 +121,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
"output_format": {
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -143,7 +143,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
await super().set_model(model)
|
||||
logger.info(f"Switching TTS model to: [{model}]")
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_cartesia_language(language)
|
||||
|
||||
def _build_msg(
|
||||
@@ -174,6 +174,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["output_format"]["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -262,7 +263,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
self.start_word_timestamps()
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
sample_rate=self._settings["output_format"]["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
@@ -328,7 +329,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
voice_id: str,
|
||||
model: str = "sonic",
|
||||
base_url: str = "https://api.cartesia.ai",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
params: InputParams = InputParams(),
|
||||
@@ -341,7 +342,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
"output_format": {
|
||||
"container": container,
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
},
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
@@ -357,9 +358,13 @@ class CartesiaHttpTTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_cartesia_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["output_format"]["sample_rate"] = self.sample_rate
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._client.close()
|
||||
@@ -394,9 +399,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
)
|
||||
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=output["audio"],
|
||||
sample_rate=self._settings["output_format"]["sample_rate"],
|
||||
num_channels=1,
|
||||
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
|
||||
)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -53,14 +53,13 @@ class DeepgramTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice: str = "aura-helios-en",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
encoding: str = "linear16",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"encoding": encoding,
|
||||
}
|
||||
self.set_voice(voice)
|
||||
@@ -75,7 +74,7 @@ class DeepgramTTSService(TTSService):
|
||||
options = SpeakOptions(
|
||||
model=self._voice_id,
|
||||
encoding=self._settings["encoding"],
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
sample_rate=self.sample_rate,
|
||||
container="none",
|
||||
)
|
||||
|
||||
@@ -103,9 +102,7 @@ class DeepgramTTSService(TTSService):
|
||||
chunk = audio_buffer.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=chunk, sample_rate=self._settings["sample_rate"], num_channels=1
|
||||
)
|
||||
frame = TTSAudioRawFrame(audio=chunk, sample_rate=self.sample_rate, num_channels=1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
@@ -121,15 +118,16 @@ class DeepgramSTTService(STTService):
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "",
|
||||
live_options: LiveOptions = None,
|
||||
sample_rate: Optional[int] = None,
|
||||
live_options: Optional[LiveOptions] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
default_options = LiveOptions(
|
||||
encoding="linear16",
|
||||
language=Language.EN,
|
||||
model="nova-2-general",
|
||||
sample_rate=16000,
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
smart_format=True,
|
||||
@@ -187,6 +185,7 @@ class DeepgramSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
|
||||
@@ -55,7 +55,7 @@ ELEVENLABS_MULTILINGUAL_MODELS = {
|
||||
}
|
||||
|
||||
|
||||
def language_to_elevenlabs_language(language: Language) -> str | None:
|
||||
def language_to_elevenlabs_language(language: Language) -> Optional[str]:
|
||||
BASE_LANGUAGES = {
|
||||
Language.AR: "ar",
|
||||
Language.BG: "bg",
|
||||
@@ -104,17 +104,20 @@ def language_to_elevenlabs_language(language: Language) -> str | None:
|
||||
return result
|
||||
|
||||
|
||||
def sample_rate_from_output_format(output_format: str) -> int:
|
||||
match output_format:
|
||||
case "pcm_16000":
|
||||
return 16000
|
||||
case "pcm_22050":
|
||||
return 22050
|
||||
case "pcm_24000":
|
||||
return 24000
|
||||
case "pcm_44100":
|
||||
return 44100
|
||||
return 16000
|
||||
def output_format_from_sample_rate(sample_rate: int) -> str:
|
||||
match sample_rate:
|
||||
case 16000:
|
||||
return "pcm_16000"
|
||||
case 22050:
|
||||
return "pcm_22050"
|
||||
case 24000:
|
||||
return "pcm_24000"
|
||||
case 44100:
|
||||
return "pcm_44100"
|
||||
logger.warning(
|
||||
f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate"
|
||||
)
|
||||
return "pcm_16000"
|
||||
|
||||
|
||||
def calculate_word_times(
|
||||
@@ -165,7 +168,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
voice_id: str,
|
||||
model: str = "eleven_flash_v2_5",
|
||||
url: str = "wss://api.elevenlabs.io",
|
||||
output_format: ElevenLabsOutputFormat = "pcm_24000",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -189,7 +192,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
sample_rate=sample_rate_from_output_format(output_format),
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
@@ -197,11 +200,9 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate_from_output_format(output_format),
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else None,
|
||||
"output_format": output_format,
|
||||
"optimize_streaming_latency": params.optimize_streaming_latency,
|
||||
"stability": params.stability,
|
||||
"similarity_boost": params.similarity_boost,
|
||||
@@ -211,6 +212,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
@@ -221,7 +223,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
def language_to_service_language(self, language: Language) -> str | None:
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
return language_to_elevenlabs_language(language)
|
||||
|
||||
def _set_voice_settings(self):
|
||||
@@ -254,7 +256,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def _update_settings(self, settings: Dict[str, Any]):
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
prev_voice = self._voice_id
|
||||
await super()._update_settings(settings)
|
||||
if not prev_voice == self._voice_id:
|
||||
@@ -264,6 +266,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._output_format = output_format_from_sample_rate(self.sample_rate)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -322,7 +325,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
voice_id = self._voice_id
|
||||
model = self.model_name
|
||||
output_format = self._settings["output_format"]
|
||||
output_format = self._output_format
|
||||
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}&auto_mode={self._settings['auto_mode']}"
|
||||
|
||||
if self._settings["optimize_streaming_latency"]:
|
||||
@@ -375,7 +378,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
frame = TTSAudioRawFrame(audio, self._settings["sample_rate"], 1)
|
||||
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
if msg.get("alignment"):
|
||||
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
|
||||
@@ -428,7 +431,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
aiohttp_session: aiohttp ClientSession
|
||||
model: Model ID (default: "eleven_flash_v2_5" for low latency)
|
||||
base_url: API base URL
|
||||
output_format: Audio output format (PCM)
|
||||
sample_rate: Output sample rate
|
||||
params: Additional parameters for voice configuration
|
||||
"""
|
||||
|
||||
@@ -448,24 +451,21 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
model: str = "eleven_flash_v2_5",
|
||||
base_url: str = "https://api.elevenlabs.io",
|
||||
output_format: ElevenLabsOutputFormat = "pcm_24000",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._base_url = base_url
|
||||
self._output_format = output_format
|
||||
self._params = params
|
||||
self._session = aiohttp_session
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate_from_output_format(output_format),
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else None,
|
||||
"output_format": output_format,
|
||||
"optimize_streaming_latency": params.optimize_streaming_latency,
|
||||
"stability": params.stability,
|
||||
"similarity_boost": params.similarity_boost,
|
||||
@@ -474,6 +474,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
}
|
||||
self.set_model_name(model)
|
||||
self.set_voice(voice_id)
|
||||
self._output_format = "" # initialized in start()
|
||||
self._voice_settings = self._set_voice_settings()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
@@ -508,6 +509,10 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
|
||||
return voice_settings or None
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._output_format = output_format_from_sample_rate(self.sample_rate)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
"""Generate speech from text using ElevenLabs streaming API.
|
||||
|
||||
@@ -570,7 +575,7 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
async for chunk in response.content:
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
|
||||
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ class FalImageGenService(ImageGenService):
|
||||
params: InputParams,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
model: str = "fal-ai/fast-sdxl",
|
||||
key: str | None = None,
|
||||
key: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@@ -56,7 +56,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
api_key: str,
|
||||
model: str, # This is the reference_id
|
||||
output_format: FishAudioOutputFormat = "pcm",
|
||||
sample_rate: int = 24000,
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -70,7 +70,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
self._started = False
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"sample_rate": 0,
|
||||
"latency": params.latency,
|
||||
"format": output_format,
|
||||
"prosody": {
|
||||
@@ -92,6 +92,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._settings["sample_rate"] = self.sample_rate
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -157,9 +158,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
audio_data = msg.get("audio")
|
||||
# Only process larger chunks to remove msgpack overhead
|
||||
if audio_data and len(audio_data) > 1024:
|
||||
frame = TTSAudioRawFrame(
|
||||
audio_data, self._settings["sample_rate"], 1
|
||||
)
|
||||
frame = TTSAudioRawFrame(audio_data, self.sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
await self.stop_ttfb_metrics()
|
||||
continue
|
||||
|
||||
@@ -48,7 +48,7 @@ class AudioInputMessage(BaseModel):
|
||||
realtimeInput: RealtimeInput
|
||||
|
||||
@classmethod
|
||||
def from_raw_audio(cls, raw_audio: bytes, sample_rate=16000) -> "AudioInputMessage":
|
||||
def from_raw_audio(cls, raw_audio: bytes, sample_rate: int) -> "AudioInputMessage":
|
||||
data = base64.b64encode(raw_audio).decode("utf-8")
|
||||
return cls(
|
||||
realtimeInput=RealtimeInput(
|
||||
|
||||
@@ -203,6 +203,8 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_audio_buffer = bytearray()
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
self._sample_rate = 24000
|
||||
|
||||
self._settings = {
|
||||
"frequency_penalty": params.frequency_penalty,
|
||||
"max_tokens": params.max_tokens,
|
||||
@@ -521,7 +523,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
if self._audio_input_paused:
|
||||
return
|
||||
# Send all audio to Gemini
|
||||
evt = events.AudioInputMessage.from_raw_audio(frame.audio)
|
||||
evt = events.AudioInputMessage.from_raw_audio(frame.audio, frame.sample_rate)
|
||||
await self.send_client_event(evt)
|
||||
# Manage a buffer of audio to use for transcription
|
||||
audio = frame.audio
|
||||
@@ -650,7 +652,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
inline_data = part.inlineData
|
||||
if not inline_data:
|
||||
return
|
||||
if inline_data.mimeType != "audio/pcm;rate=24000":
|
||||
if inline_data.mimeType != f"audio/pcm;rate={self._sample_rate}":
|
||||
logger.warning(f"Unrecognized server_content format {inline_data.mimeType}")
|
||||
return
|
||||
|
||||
@@ -665,7 +667,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_audio_buffer.extend(audio)
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=audio,
|
||||
sample_rate=24000,
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user