Compare commits

...

62 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
d4b2160f9c Merge pull request #1161 from pipecat-ai/aleix/prepare-0.0.56
update CHANGELOG for 0.0.56
2025-02-06 13:50:04 -08:00
Aleix Conchillo Flaqué
dd7926aab5 update CHANGELOG for 0.0.56 2025-02-06 13:45:13 -08:00
Aleix Conchillo Flaqué
070bf66980 transports: fix local transports audio cleanup 2025-02-06 13:45:13 -08:00
Aleix Conchillo Flaqué
962fc27dbd Merge pull request #1160 from pipecat-ai/aleix/fix-unit-test-logging
tests: remove logger from tests.utils
2025-02-06 13:26:37 -08:00
Mark Backman
3d4d6132fc Merge pull request #1158 from pipecat-ai/mb/update-22c
Update foundation examples 22b, 22c, and 22d to be ready for function…
2025-02-06 16:25:05 -05:00
Aleix Conchillo Flaqué
a96d9294b7 tests: remove logger from tests.utils 2025-02-06 13:18:28 -08:00
Aleix Conchillo Flaqué
a6e78550d5 Merge pull request #1156 from pipecat-ai/aleix/prefer-optional
prefer Optional over to "| None"
2025-02-06 13:08:48 -08:00
Mark Backman
969de92ad9 Update foundation examples 22b, 22c, and 22d to be ready for function calling 2025-02-06 15:36:16 -05:00
Aleix Conchillo Flaqué
c4dbe92b30 prefer Optional over to "| None" 2025-02-06 11:11:37 -08:00
Aleix Conchillo Flaqué
684764fece Merge pull request #1155 from pipecat-ai/aleix/sentry-fixes-and-example
sentry fixes and example
2025-02-06 11:09:31 -08:00
Aleix Conchillo Flaqué
c4be07693f examples: added sentry-metrics example 2025-02-06 10:46:04 -08:00
Aleix Conchillo Flaqué
c5d5ca8232 SentryMetrics: use transactions and call parent methods 2025-02-06 10:44:38 -08:00
Mark Backman
428e763814 Merge pull request #1149 from pipecat-ai/mb/update-google-default-llm-model
Use gemini-2.0-flash-001 as the default model for GoogleLLMService
2025-02-06 12:41:13 -05:00
Mark Backman
0efa2711ff Merge pull request #1152 from pipecat-ai/mb/docstrings
Add docstrings for PipelineTask and related classes/functions
2025-02-06 12:30:12 -05:00
Mark Backman
4904f52cee Use gemini-2.0-flash-001 as the default model for GoogleLLMService 2025-02-06 12:29:15 -05:00
Aleix Conchillo Flaqué
dbcf14ddb4 Merge pull request #1154 from pipecat-ai/aleix/twilio-telnyx-sample-rates
serializers: don't update twilio/telnyx sample rates
2025-02-06 09:27:42 -08:00
Aleix Conchillo Flaqué
7c13ec10d9 examples: cleanup ElevenLabsTTSService constructor arguments 2025-02-06 09:25:52 -08:00
Aleix Conchillo Flaqué
29b9dccc53 serializers: don't update twilio/telnyx sample rates 2025-02-06 09:25:52 -08:00
Aleix Conchillo Flaqué
e8ce826473 Merge pull request #1151 from pipecat-ai/aleix/base-output-transport-resample
BaseOutputTransport: resample incoming audio if needed
2025-02-06 09:25:07 -08:00
Aleix Conchillo Flaqué
bbb991dfd8 Merge pull request #1153 from pipecat-ai/aleix/base-input-transport-show-vad
BaseInputTransport: show VAD results when interruptions not allowed
2025-02-06 09:24:12 -08:00
Mark Backman
4432e7e4f7 Add docstrings for PipelineTask and related classes/functions 2025-02-06 11:04:54 -05:00
Aleix Conchillo Flaqué
ee9cce64b2 BaseInputTransport: show VAD results when interruptions not allowed 2025-02-06 07:40:03 -08:00
Aleix Conchillo Flaqué
1ae4f0150d BaseOutputTransport: resample incoming audio if needed 2025-02-06 07:37:43 -08:00
Mark Backman
4c77c3ed34 Merge pull request #1148 from pipecat-ai/mb/fix-twilio-serializer
Fix sample rate handling in Twilio and Telnyx serializers
2025-02-06 10:25:13 -05:00
Aleix Conchillo Flaqué
975b97472a Merge pull request #1144 from pipecat-ai/aleix/frame-processor-missing-init-warning
FrameProcessor: add an error about missing super().process_frame(...)
2025-02-06 07:18:35 -08:00
Mark Backman
c8ccf13bc7 fix: Use audio_in_sample_rate to deserialize data for TelnyxFrameSerializer 2025-02-06 09:59:21 -05:00
Mark Backman
ba59736f87 fix: Use audio_in_sample_rate to deserialize data for TwilioFrameSerializer 2025-02-06 09:55:15 -05:00
Aleix Conchillo Flaqué
bc21a0b817 FrameProcessor: add an error about missing super().process_frame(...) 2025-02-05 18:33:03 -08:00
Aleix Conchillo Flaqué
99d3227ff5 Merge pull request #1126 from pipecat-ai/aleix/prepare-0.0.55
update CHANGELOG for 0.0.55
2025-02-05 11:32:39 -08:00
Aleix Conchillo Flaqué
7730f59635 update CHANGELOG for 0.0.55 2025-02-05 11:30:40 -08:00
Aleix Conchillo Flaqué
ba31546c32 Merge pull request #1139 from pipecat-ai/aleix/task-start-metadata
pipeline task start metadata and unit test improvements
2025-02-05 10:51:51 -08:00
Aleix Conchillo Flaqué
a363d12d1f dev-requirements: fix conflicts because of nvidia-riva-client 2025-02-05 10:34:46 -08:00
Aleix Conchillo Flaqué
feab9c8fa2 tests: run_test() now uses PipelineTask 2025-02-05 10:34:38 -08:00
Aleix Conchillo Flaqué
61f6669926 task: allow passing StartFrame metadata via start_metadata param 2025-02-05 10:34:38 -08:00
Aleix Conchillo Flaqué
3be69908d2 Merge pull request #1131 from pipecat-ai/aleix/global-audio-sample-rates
introduce PipelineParams audio input/output sample rates
2025-02-05 08:11:25 -08:00
Aleix Conchillo Flaqué
fcb80ec330 playht: don't set sample_rate in _settings 2025-02-05 07:46:24 -08:00
Mark Backman
c9f5684e2f OpenAITTSService: Add warning about changing sample_rate 2025-02-05 10:13:46 -05:00
Mark Backman
c257fa1573 AzureTTSService, AzureHttpTTSService: add start() method 2025-02-05 10:05:19 -05:00
Mark Backman
97c55da29f PlayHTHttpTTSService: add start() method to set sample_rate 2025-02-05 09:54:41 -05:00
Aleix Conchillo Flaqué
49426aa9a1 transport(websocket): improve exception logging 2025-02-04 23:50:45 -08:00
Aleix Conchillo Flaqué
0a333c26da services(elevenlabs): warn if sample rate not supported 2025-02-04 23:50:21 -08:00
Aleix Conchillo Flaqué
75a29424ff examples(telnyx-chatbot): use cartesia so we can use 8khz 2025-02-04 23:49:50 -08:00
Filipi da Silva Fuchter
cd1b429308 Merge pull request #1133 from pipecat-ai/fixing_krisp_issue
Fixing the issue in Krisp when trying to create more than one
2025-02-04 20:44:29 -03:00
Filipi Fuchter
7f1ae4b8cc Fixing the issue in Krisp when trying to create more than one filter in the same process. 2025-02-04 20:10:56 -03:00
Aleix Conchillo Flaqué
af9fd811cd examples(moondream-chatbot): fix UserImageRequester 2025-02-04 14:37:53 -08:00
Aleix Conchillo Flaqué
69f5c9b9d3 update anthropic and openpipe versions 2025-02-04 14:37:36 -08:00
Aleix Conchillo Flaqué
ab45e481be introduce PipelineParams audio input/output sample rates 2025-02-04 14:12:56 -08:00
Aleix Conchillo Flaqué
cc54255c41 Merge pull request #1125 from pipecat-ai/aleix/twilio-chatbot-improvements 2025-02-03 11:10:33 -08:00
Aleix Conchillo Flaqué
1cdb66f889 examples(twilio-chatbot): create sample rate variable 2025-02-03 10:58:06 -08:00
Aleix Conchillo Flaqué
51a86a509c examples: multiple twilio-chatbot improvements 2025-02-03 10:36:24 -08:00
Aleix Conchillo Flaqué
824898f7b7 Merge pull request #1121 from pipecat-ai/aleix/audio-resamplers
introduce audio resamplers
2025-02-03 10:32:55 -08:00
Aleix Conchillo Flaqué
57dadb6359 audio(utils): some variable renames 2025-02-03 09:33:04 -08:00
Aleix Conchillo Flaqué
5dcdc68ef5 examples: fix 22 series initial gate state 2025-02-03 09:16:58 -08:00
Aleix Conchillo Flaqué
aafb2db620 GatedOpenAILLMContextAggregator: use keyword argument and add start_open 2025-02-03 09:16:44 -08:00
Aleix Conchillo Flaqué
f3f22cf61c AudioBufferProcessor: add start_recording()/stop_recording() 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
371c2f3704 canonical: do not reset audio buffers 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
1f14f62696 AudioBufferProcessor: fix audio buffer silence computation 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
06449eff2c BaseAudioResampler: make resample() async 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
dcfb86583d serializers: serialize()/deserialize() are now async 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
cda34a1320 AudioBufferProcessor: fix user/bot audio buffers silence padding 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
13611fd8e1 AudioBufferProcessor: call callback on CancelFrame 2025-02-01 11:06:58 -08:00
Aleix Conchillo Flaqué
fc89aad469 introduce audio resamplers 2025-02-01 11:06:55 -08:00
127 changed files with 2318 additions and 825 deletions

View File

@@ -5,10 +5,61 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.56] - 2025-02-06
### Changed
- Improved foundational examples 22b, 22c, and 22d to support function calling.
With these base examples, `FunctionCallInProgressFrame` and
`FunctionCallResultFrame` will no longer be blocked by the gates.
### Fixed
- Fixed a `TkLocalTransport` and `LocalAudioTransport` issues that was causing
errors on cleanup.
- Fixed an issue that was causing `tests.utils` import to fail because of
logging setup.
- Fixed a `SentryMetrics` issue that was preventing any metrics to be sent to
Sentry and also was preventing from metrics frames to be pushed to the pipeline.
- Fixed an issue in `BaseOutputTransport` where incoming audio would not be
resampled to the desired output sample rate.
- Fixed an issue with the `TwilioFrameSerializer` and `TelnyxFrameSerializer`
where `twilio_sample_rate` and `telnyx_sample_rate` were incorrectly
initialized to `audio_in_sample_rate`. Those values currently default to 8000
and should be set manually from the serializer constructor if a different
value is needed.
### Changed
- Use `gemini-2.0-flash-001` as the default model for `GoogleLLMSerivce`.
### Other
- Added a new `sentry-metrics` example.
## [0.0.55] - 2025-02-05
### Added
- Added a new `start_metadata` field to `PipelineParams`. The provided metadata
will be set to the initial `StartFrame` being pushed from the `PipelineTask`.
- Added new fields to `PipelineParams` to control audio input and output sample
rates for the whole pipeline. This allows controlling sample rates from a
single place instead of having to specify sample rates in each
service. Setting a sample rate to a service is still possible and will
override the value from `PipelineParams`.
- Introduce audio resamplers (`BaseAudioResampler`). This is just a base class
to implement audio resamplers. Currently, two implementations are provided
`SOXRAudioResampler` and `ResampyResampler`. A new
`create_default_resampler()` has been added (replacing the now deprecated
`resample_audio()`).
- It is now possible to specify the asyncio event loop that a `PipelineTask` and
all the processors should run on by passing it as a new argument to the
`PipelineRunner`. This could allow running pipelines in multiple threads each
@@ -41,6 +92,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `GatedOpenAILLMContextAggregator` now require keyword arguments. Also, a new
`start_open` argument has been added to set the initial state of the gate.
- Added `organization` and `project` level authentication to
`OpenAILLMService`.
@@ -56,8 +110,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `InputDTMFFrame` is now based on `DTMFFrame`. There's also a new
`OutputDTMFFrame` frame.
### Deprecated
- `resample_audio()` is now deprecated, use `create_default_resampler()`
instead.
### Removed
- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
`AudioBufferProcessor.start_recording()` and
``AudioBufferProcessor.stop_recording()` instead.
### Fixed
- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
- Fixed an issue in `AudioBufferProcessor` where user callback would not be
called on task cancellation.
- Fixed an issue in `AudioBufferProcessor` that would cause wrong silence
padding in some cases.
- Fixed an issue where `ElevenLabsTTSService` messages would return a 1009
websocket error by increasing the max message size limit to 16MB.
@@ -73,6 +146,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Other
- Improved Unit Test `run_test()` to use `PipelineTask` and
`PipelineRunner`. There's now also some control around `StartFrame` and
`EndFrame`. The `EndTaskFrame` has been removed since it doesn't seem
necessary with this new approach.
- Updated `twilio-chatbot` with a few new features: use 8000 sample rate and
avoid resampling, a new client useful for stress testing and testing locally
without the need to make phone calls. Also, added audio recording on both the
client and the server to make sure the audio sounds good.
- Updated examples to use `task.cancel()` to immediately exit the example when a
participant leaves or disconnects, instead of pushing an `EndFrame`. Pushing
an `EndFrame` causes the bot to run through everything that is internally
@@ -1527,6 +1610,9 @@ async def on_connected(processor):
### Changed
- `FrameSerializer.serialize()` and `FrameSerializer.deserialize()` are now
`async`.
- `Filter` has been renamed to `FrameFilter` and it's now under
`processors/filters`.

View File

@@ -1,11 +1,11 @@
build~=1.2.2
grpcio-tools~=1.69.0
grpcio-tools~=1.67.1
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.392
pytest~=8.3.4
pytest-asyncio~=0.25.2
ruff~=0.9.1
setuptools~=75.8.0
setuptools~=70.0.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -6,6 +6,7 @@
import argparse
import os
from typing import Optional
import aiohttp
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")

View File

@@ -17,7 +17,7 @@ from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -31,16 +31,15 @@ logger.add(sys.stderr, level="DEBUG")
class SilenceFrame(OutputAudioRawFrame):
def __init__(
self,
audio: bytes = None,
sample_rate: int = 16000,
num_channels: int = 1,
duration: float = 0.1,
*,
sample_rate: int,
duration: float,
):
# Initialize the parent class with the silent frame's data
super().__init__(
audio=self.create_silent_audio_frame(sample_rate, num_channels, duration).audio,
audio=self.create_silent_audio_frame(sample_rate, 1, duration).audio,
sample_rate=sample_rate,
num_channels=num_channels,
num_channels=1,
)
@staticmethod
@@ -80,7 +79,10 @@ async def main():
return
await task.queue_frames(
[
SilenceFrame(duration=0.5),
SilenceFrame(
sample_rate=task.params.audio_out_sample_rate,
duration=0.5,
),
TTSSpeakFrame(f"Hello there, how are you doing today ?"),
EndFrame(),
]

View File

@@ -65,7 +65,6 @@ async def main():
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
@@ -124,6 +123,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audio_buffer_processor.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -82,7 +82,6 @@ async def main():
# English
#
voice_id="cgSgspJ2msm6clMCkdW9",
aiohttp_session=session,
#
# Spanish
#
@@ -109,8 +108,9 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Save audio every 10 seconds.
audiobuffer = AudioBufferProcessor(buffer_size=480000)
# NOTE: Watch out! This will save all the conversation in memory. You
# can pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
@@ -132,6 +132,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audiobuffer.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -51,7 +51,6 @@ async def main():
)
elevenlabs_tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

View File

@@ -37,7 +37,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -38,7 +38,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -40,7 +40,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,

View File

@@ -216,11 +216,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(
model="gemini-1.5-flash-latest",
# model="gemini-exp-1114",
api_key=os.getenv("GOOGLE_API_KEY"),
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
messages = [
{

View File

@@ -48,7 +48,6 @@ async def main():
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts2 = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="jBpfuIE2acCO8z3wKNLl",
)

View File

@@ -21,7 +21,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -61,7 +61,6 @@ async def main():
"Test",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -78,7 +77,9 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
await runner.run(task)

View File

@@ -22,7 +22,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
@@ -62,7 +62,7 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
room_url, token, "Test", DailyParams(audio_in_enabled=True)
)
tk_transport = TkLocalTransport(
@@ -82,7 +82,9 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
async def run_tk():
while not task.has_finished():

View File

@@ -7,6 +7,7 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -7,6 +7,7 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id
@@ -72,9 +73,7 @@ async def main():
vision_aggregator = VisionImageFrameAggregator()
google = GoogleLLMService(
model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY")
)
google = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -7,6 +7,7 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -7,6 +7,7 @@
import asyncio
import os
import sys
from typing import Optional
import aiohttp
from dotenv import load_dotenv
@@ -32,7 +33,7 @@ logger.add(sys.stderr, level="DEBUG")
class UserImageRequester(FrameProcessor):
def __init__(self, participant_id: str | None = None):
def __init__(self, participant_id: Optional[str] = None):
super().__init__()
self._participant_id = participant_id

View File

@@ -62,11 +62,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(
model="gemini-1.5-flash-latest",
# model="gemini-exp-1114",
api_key=os.getenv("GOOGLE_API_KEY"),
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)

View File

@@ -51,8 +51,6 @@ async def main():
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
audio_sample_rate=24000,
audio_channels=1,
),
)

View File

@@ -80,9 +80,7 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -177,9 +177,7 @@ async def main():
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),

View File

@@ -237,7 +237,7 @@ async def main():
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
llm = GoogleLLMService(model="gemini-2.0-flash-001", api_key=os.getenv("GOOGLE_API_KEY"))
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)

View File

@@ -88,6 +88,10 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -104,8 +104,11 @@ async def main():
)
# This processor keeps the last context and will let it through once the
# notifier is woken up.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
# notifier is woken up. We start with the gate open because we send an
# initial context frame to start the conversation.
gated_context_aggregator = GatedOpenAILLMContextAggregator(
notifier=notifier, start_open=True
)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):

View File

@@ -12,6 +12,7 @@ import time
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -19,6 +20,8 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
@@ -26,6 +29,7 @@ from pipecat.frames.frames import (
SystemFrame,
TextFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -129,9 +133,9 @@ class CompletenessCheck(FrameProcessor):
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
@@ -156,6 +160,11 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -186,6 +195,16 @@ class OutputGate(FrameProcessor):
break
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
@@ -216,6 +235,34 @@ async def main():
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
@@ -224,7 +271,7 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
@@ -252,7 +299,9 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
@@ -263,6 +312,8 @@ async def main():
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
pipeline = Pipeline(

View File

@@ -12,6 +12,7 @@ import time
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -19,6 +20,8 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
@@ -26,6 +29,7 @@ from pipecat.frames.frames import (
SystemFrame,
TextFrame,
TranscriptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -333,9 +337,9 @@ class CompletenessCheck(FrameProcessor):
class OutputGate(FrameProcessor):
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._gate_open = False
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
@@ -360,6 +364,11 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -390,6 +399,16 @@ class OutputGate(FrameProcessor):
break
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
@@ -426,6 +445,34 @@ async def main():
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
@@ -434,7 +481,7 @@ async def main():
},
]
context = OpenAILLMContext(messages)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
@@ -461,7 +508,9 @@ async def main():
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=5.0)
bot_output_gate = OutputGate(notifier=notifier)
# We start with the gate open because we send an initial context frame
# to start the conversation.
bot_output_gate = OutputGate(notifier=notifier, start_open=True)
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
@@ -472,6 +521,8 @@ async def main():
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
or isinstance(frame, FunctionCallInProgressFrame)
or isinstance(frame, FunctionCallResultFrame)
)
pipeline = Pipeline(

View File

@@ -20,6 +20,8 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
@@ -55,13 +57,9 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
# TRANSCRIBER_MODEL = "gemini-1.5-flash-latest"
# CLASSIFIER_MODEL = "gemini-1.5-flash-latest"
# CONVERSATION_MODEL = "gemini-1.5-flash-latest"
TRANSCRIBER_MODEL = "gemini-2.0-flash-exp"
CLASSIFIER_MODEL = "gemini-2.0-flash-exp"
CONVERSATION_MODEL = "gemini-2.0-flash-exp"
TRANSCRIBER_MODEL = "gemini-2.0-flash-001"
CLASSIFIER_MODEL = "gemini-2.0-flash-001"
CONVERSATION_MODEL = "gemini-2.0-flash-001"
transcriber_system_instruction = """You are an audio transcriber. You are receiving audio from a user. Your job is to
transcribe the input audio to text exactly as it was said by the user.
@@ -579,6 +577,11 @@ class OutputGate(FrameProcessor):
await self.push_frame(frame, direction)
return
# Don't block function call frames
if isinstance(frame, (FunctionCallInProgressFrame, FunctionCallResultFrame)):
await self.push_frame(frame, direction)
return
# Ignore frames that are not following the direction of this gate.
if direction != FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
@@ -639,7 +642,6 @@ async def main():
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_sample_rate=16000,
),
)
@@ -677,12 +679,6 @@ async def main():
context = OpenAILLMContext()
context_aggregator = conversation_llm.create_context_aggregator(context)
# We have instructed the LLM to return 'True' if it thinks the user
# completed a sentence. So, if it's 'True' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "True"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
@@ -699,14 +695,6 @@ async def main():
async def block_user_stopped_speaking(frame):
return not isinstance(frame, UserStoppedSpeakingFrame)
async def pass_only_llm_trigger_frames(frame):
return (
isinstance(frame, OpenAILLMContextFrame)
or isinstance(frame, LLMMessagesFrame)
or isinstance(frame, StartInterruptionFrame)
or isinstance(frame, StopInterruptionFrame)
)
conversation_audio_context_assembler = ConversationAudioContextAssembler(context=context)
user_aggregator_buffer = UserAggregatorBuffer()

View File

@@ -292,7 +292,7 @@ async def main():
conversation_llm = GoogleLLMService(
name="Conversation",
model="gemini-1.5-flash-latest",
model="gemini-2.0-flash-001",
# model="gemini-exp-1121",
api_key=os.getenv("GOOGLE_API_KEY"),
# we can give the GoogleLLMService a system instruction to use directly
@@ -303,7 +303,7 @@ async def main():
input_transcription_llm = GoogleLLMService(
name="Transcription",
model="gemini-1.5-flash-latest",
model="gemini-2.0-flash-001",
# model="gemini-exp-1121",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=transcriber_system_message,

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -84,8 +84,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -37,8 +37,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,
@@ -47,8 +45,6 @@ async def main():
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
start_audio_paused=True,
start_video_paused=True,
),
)

View File

@@ -52,8 +52,6 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,

View File

@@ -38,8 +38,6 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
DESIRED_SAMPLE_RATE = 16000
def generate_token(room_name: str, participant_name: str, api_key: str, api_secret: str) -> str:
token = api.AccessToken(api_key, api_secret)
@@ -114,11 +112,8 @@ async def main():
token=token,
room_name=room_name,
params=LiveKitParams(
audio_in_channels=1,
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=DESIRED_SAMPLE_RATE,
audio_out_sample_rate=DESIRED_SAMPLE_RATE,
vad_analyzer=SileroVADAnalyzer(),
vad_enabled=True,
vad_audio_passthrough=True,
@@ -128,7 +123,6 @@ async def main():
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
live_options=LiveOptions(
sample_rate=DESIRED_SAMPLE_RATE,
vad_events=True,
),
)
@@ -138,7 +132,6 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=DESIRED_SAMPLE_RATE,
)
messages = [

View File

@@ -6,6 +6,7 @@
import argparse
import os
from typing import Optional
import aiohttp
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")

View File

@@ -106,8 +106,8 @@ class UserImageRequester(FrameProcessor):
UserImageRequestFrame(self.participant_id), FrameDirection.UPSTREAM
)
await self.push_frame(TextFrame("Describe the image in a short sentence."))
elif isinstance(frame, UserImageRawFrame):
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
class TextFilterProcessor(FrameProcessor):

View File

@@ -6,6 +6,7 @@
import argparse
import os
from typing import Optional
import aiohttp
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")

View File

@@ -2,13 +2,14 @@ import argparse
import asyncio
import os
import sys
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, EndTaskFrame
from pipecat.frames.frames import EndTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -42,7 +43,7 @@ async def main(
callId: str,
callDomain: str,
detect_voicemail: bool,
dialout_number: str | None,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
@@ -99,14 +100,14 @@ async def main(
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
#### **Step 2: Leave a Voicemail Message**
- Immediately say:
- Immediately say:
*"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."*
- **IMMEDIATELY AFTER LEAVING THE MESSAGE, CALL `terminate_call`.**
- **DO NOT SPEAK AFTER CALLING `terminate_call`.**
- **FAILURE TO CALL `terminate_call` IMMEDIATELY IS A MISTAKE.**
#### **Step 3: If Speaking to a Human**
- If the call is answered by a human, say:
- If the call is answered by a human, say:
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
- Keep responses **brief and helpful**.
- If the user no longer needs assistance, **call `terminate_call` immediately.**

161
examples/sentry-metrics/.gitignore vendored Normal file
View File

@@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml

View File

@@ -0,0 +1,15 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -0,0 +1,29 @@
# Sentry Metrics
This app connects you to a chatbot powered by GPT-4. It provides TTFB (Time-To-First-Byte) and processing metrics to Sentry.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image
```
docker build -t chatbot .
docker run --env-file .env -p 7860:7860 chatbot
```

View File

@@ -0,0 +1,112 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
import sentry_sdk
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.metrics.sentry import SentryMetrics
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
audio_in_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
# Initialize Sentry
sentry_sdk.init(
dsn="your-project-dsn",
traces_sample_rate=1.0,
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="cgSgspJ2msm6clMCkdW9",
metrics=SentryMetrics(),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
metrics=SentryMetrics(),
)
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # microphone
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(allow_interruptions=True, enable_metrics=True),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,4 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...

View File

@@ -0,0 +1,4 @@
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,openai,sentry,silero,elevenlabs]

View File

@@ -0,0 +1,56 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -0,0 +1,139 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -121,8 +121,6 @@ async def main():
token,
"Chatbot",
DailyParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,

View File

@@ -1,5 +1,5 @@
beautifulsoup4==4.12.3
pypdf==4.3.1
tiktoken==0.7.0
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
pipecat-ai[daily,cartesia,openai,silero]
python-dotenv==1.0.1

View File

@@ -6,6 +6,7 @@
import argparse
import os
from typing import Optional
import aiohttp
@@ -18,7 +19,7 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")

View File

@@ -112,7 +112,6 @@ async def main():
token,
"studypal",
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
@@ -124,7 +123,6 @@ async def main():
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID", "4d2fd738-3b3d-4368-957a-bb4805275bd9"),
# British Narration Lady: 4d2fd738-3b3d-4368-957a-bb4805275bd9
sample_rate=44100,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o-mini")
@@ -155,7 +153,12 @@ Your task is to help the user understand and learn from this article in 2 senten
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(
pipeline,
PipelineParams(
audio_out_sample_rate=44100, allow_interruptions=True, enable_metrics=True
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -11,14 +11,13 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.serializers.telnyx import TelnyxFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService, Language
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
@@ -31,7 +30,12 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encoding):
async def run_bot(
websocket_client,
stream_id: str,
outbound_encoding: str,
inbound_encoding: str,
):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
@@ -48,11 +52,9 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="CwhRBWXzGAHq8TQ4Fs17",
output_format="pcm_24000",
params=ElevenLabsTTSService.InputParams(language=Language.EN),
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
messages = [
@@ -77,7 +79,14 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
@@ -87,7 +96,7 @@ async def run_bot(websocket_client, stream_id, outbound_encoding, inbound_encodi
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
await task.cancel()
runner = PipelineRunner(handle_sigint=False)

View File

@@ -107,3 +107,34 @@ The server will start on port 8765. Keep this running while you test with Twilio
## Usage
To start a call, simply make a call to your configured Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.
## Testing
It is also possible to automatically test the server without making phone calls by using a software client.
First, update `templates/streams.xml` to point to your server's websocket endpoint. For example:
```
<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="ws://localhost:8765/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>
```
Then, start the server with `-t` to indicate we are testing:
```sh
# Make sure youre in the project directory and your virtual environment is activated
python server.py -t
```
Finally, just point the client to the server's URL:
```sh
python client.py -u http://localhost:8765 -c 2
```
where `-c` allows you to create multiple concurrent clients.

View File

@@ -4,10 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import datetime
import io
import os
import sys
import wave
import aiofiles
from dotenv import load_dotenv
from fastapi import WebSocket
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -15,6 +20,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
@@ -30,10 +36,29 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_sid):
async def save_audio(server_name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = (
f"{server_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
)
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Merged audio saved to {filename}")
else:
logger.info("No audio data to save")
async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
@@ -45,23 +70,28 @@ async def run_bot(websocket_client, stream_sid):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
push_silence_after_stop=testing,
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
@@ -70,14 +100,22 @@ async def run_bot(websocket_client, stream_sid):
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Start recording.
await audiobuffer.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@@ -86,6 +124,15 @@ async def run_bot(websocket_client, stream_sid):
async def on_client_disconnected(transport, client):
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
server_name = f"server_{websocket_client.client.port}"
await save_audio(server_name, audio, sample_rate, num_channels)
# We use `handle_sigint=False` because `uvicorn` is controlling keyboard
# interruptions. We use `force_gc=True` to force garbage collection after
# the runner finishes running a task which could be useful for long running
# applications with multiple clients connecting.
runner = PipelineRunner(handle_sigint=False, force_gc=True)
await runner.run(task)

View File

@@ -0,0 +1,199 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import datetime
import io
import os
import sys
import wave
import xml.etree.ElementTree as ET
from uuid import uuid4
import aiofiles
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndFrame, TransportMessageUrgentFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.websocket_client import (
WebsocketClientParams,
WebsocketClientTransport,
)
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
DEFAULT_CLIENT_DURATION = 30
async def download_twiml(server_url: str) -> str:
# TODO(aleix): add error checking.
async with aiohttp.ClientSession() as session:
async with session.post(server_url) as response:
return await response.text()
def get_stream_url_from_twiml(twiml: str) -> str:
root = ET.fromstring(twiml)
# TODO(aleix): add error checking.
stream_element = root.find(".//Stream") # Finds the first <Stream> element
url = stream_element.get("url")
return url
async def save_audio(client_name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = (
f"{client_name}_recording_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
)
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Merged audio saved to {filename}")
else:
logger.info("No audio data to save")
async def run_client(client_name: str, server_url: str, duration_secs: int):
twiml = await download_twiml(server_url)
stream_url = get_stream_url_from_twiml(twiml)
stream_sid = str(uuid4())
transport = WebsocketClientTransport(
uri=stream_url,
params=WebsocketClientParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
serializer=TwilioFrameSerializer(stream_sid),
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)),
vad_audio_passthrough=True,
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# We let the audio passthrough so we can record the conversation.
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
audio_passthrough=True,
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="e13cae5c-ec59-4f71-b0a6-266df3c9bb8e", # Madame Mischief
push_silence_after_stop=True,
)
messages = [
{
"role": "system",
"content": "You are an 8 year old child. A teacher will explain you new concepts you want to know about. Feel free to change topics whnever you want. Once you are taught something you need to keep asking for clarifications if you think someone your age would not understand what you are being taught.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
transport.input(), # Websocket input from server
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to server
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000, audio_out_sample_rate=8000, allow_interruptions=True
),
)
@transport.event_handler("on_connected")
async def on_connected(transport: WebsocketClientTransport, client):
# Start recording.
await audiobuffer.start_recording()
message = TransportMessageUrgentFrame(
message={"event": "connected", "protocol": "Call", "version": "1.0.0"}
)
await transport.output().send_message(message)
message = TransportMessageUrgentFrame(
message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}}
)
await transport.output().send_message(message)
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
await save_audio(client_name, audio, sample_rate, num_channels)
async def end_call():
await asyncio.sleep(duration_secs)
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await asyncio.gather(runner.run(task), end_call())
async def main():
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Client")
parser.add_argument("-u", "--url", type=str, required=True, help="specify the server URL")
parser.add_argument(
"-c", "--clients", type=int, required=True, help="number of concurrent clients"
)
parser.add_argument(
"-d",
"--duration",
type=int,
default=DEFAULT_CLIENT_DURATION,
help=f"duration of each client in seconds (default: {DEFAULT_CLIENT_DURATION})",
)
args, _ = parser.parse_known_args()
clients = []
for i in range(args.clients):
clients.append(asyncio.create_task(run_client(f"client_{i}", args.url, args.duration)))
await asyncio.gather(*clients)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import json
import uvicorn
@@ -38,8 +39,16 @@ async def websocket_endpoint(websocket: WebSocket):
print(call_data, flush=True)
stream_sid = call_data["start"]["streamSid"]
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid)
await run_bot(websocket, stream_sid, app.state.testing)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Twilio Chatbot Server")
parser.add_argument(
"-t", "--test", action="store_true", default=False, help="set the server in testing mode"
)
args, _ = parser.parse_known_args()
app.state.testing = args.test
uvicorn.run(app, host="0.0.0.0", port=8765)

View File

@@ -17,6 +17,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
@@ -80,7 +81,7 @@ class SessionTimeoutHandler:
async def main():
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_sample_rate=16000,
serializer=ProtobufFrameSerializer(),
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
@@ -97,7 +98,6 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
)
messages = [
@@ -122,7 +122,12 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000, audio_out_sample_rate=16000, allow_interruptions=True
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):

View File

@@ -32,6 +32,7 @@ dependencies = [
"protobuf~=5.29.3",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0"
]
@@ -40,7 +41,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.39.0" ]
anthropic = [ "anthropic~=0.45.2" ]
assemblyai = [ "assemblyai~=0.36.0" ]
aws = [ "boto3~=1.35.99" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
@@ -69,9 +70,10 @@ moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
nim = [ "openai~=1.59.6" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
openpipe = [ "openpipe~=4.43.0" ]
openpipe = [ "openpipe~=4.45.0" ]
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.18.0" ]
sentry = [ "sentry-sdk~=2.20.0" ]
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]

View File

@@ -20,6 +20,22 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class KrispProcessorManager:
"""
Ensures that only one KrispAudioProcessor instance exists for the entire program.
"""
_krisp_instance = None
@classmethod
def get_processor(cls, sample_rate: int, sample_type: str, channels: int, model_path: str):
if cls._krisp_instance is None:
cls._krisp_instance = KrispAudioProcessor(
sample_rate, sample_type, channels, model_path
)
return cls._krisp_instance
class KrispFilter(BaseAudioFilter):
def __init__(
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
@@ -48,7 +64,7 @@ class KrispFilter(BaseAudioFilter):
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
self._krisp_processor = KrispAudioProcessor(
self._krisp_processor = KrispProcessorManager.get_processor(
self._sample_rate, self._sample_type, self._channels, self._model_path
)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,30 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseAudioResampler(ABC):
"""Abstract base class for audio resampling. This class defines an
interface for audio resampling implementations.
"""
@abstractmethod
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""
Resamples the given audio data to a different sample rate.
This is an abstract method that must be implemented in subclasses.
Parameters:
audio (bytes): The audio data to be resampled, represented as a byte string.
in_rate (int): The original sample rate of the audio data (in Hz).
out_rate (int): The desired sample rate for the resampled audio data (in Hz).
Returns:
bytes: The resampled audio data as a byte string.
"""
pass

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import resampy
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
class ResampyResampler(BaseAudioResampler):
"""Audio resampler implementation using the resampy library."""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
if in_rate == out_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = resampy.resample(audio_data, in_rate, out_rate, filter="kaiser_fast")
result = resampled_audio.astype(np.int16).tobytes()
return result

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
class SOXRAudioResampler(BaseAudioResampler):
"""Audio resampler implementation using the SoX resampler library."""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
if in_rate == out_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
resampled_audio = soxr.resample(audio_data, in_rate, out_rate, quality="VHQ")
result = resampled_audio.astype(np.int16).tobytes()
return result

View File

@@ -10,8 +10,24 @@ import numpy as np
import pyloudnorm as pyln
import soxr
from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
from pipecat.audio.resamplers.soxr_resampler import SOXRAudioResampler
def create_default_resampler(**kwargs) -> BaseAudioResampler:
return SOXRAudioResampler(**kwargs)
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"'resample_audio()' is deprecated, use 'create_default_resampler()' instead.",
DeprecationWarning,
)
if original_rate == target_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
@@ -75,41 +91,45 @@ def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
return prev_value + factor * (value - prev_value)
def ulaw_to_pcm(ulaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def ulaw_to_pcm(
ulaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
):
# Convert μ-law to PCM
in_pcm_bytes = audioop.ulaw2lin(ulaw_bytes, 2)
# Resample
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
def pcm_to_ulaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def pcm_to_ulaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
out_ulaw_bytes = audioop.lin2ulaw(in_pcm_bytes, 2)
return ulaw_bytes
return out_ulaw_bytes
def alaw_to_pcm(alaw_bytes: bytes, in_sample_rate: int, out_sample_rate: int) -> bytes:
async def alaw_to_pcm(
alaw_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler
) -> bytes:
# Convert a-law to PCM
in_pcm_bytes = audioop.alaw2lin(alaw_bytes, 2)
# Resample
out_pcm_bytes = resample_audio(in_pcm_bytes, in_sample_rate, out_sample_rate)
out_pcm_bytes = await resampler.resample(in_pcm_bytes, in_rate, out_rate)
return out_pcm_bytes
def pcm_to_alaw(pcm_bytes: bytes, in_sample_rate: int, out_sample_rate: int):
async def pcm_to_alaw(pcm_bytes: bytes, in_rate: int, out_rate: int, resampler: BaseAudioResampler):
# Resample
in_pcm_bytes = resample_audio(pcm_bytes, in_sample_rate, out_sample_rate)
in_pcm_bytes = await resampler.resample(pcm_bytes, in_rate, out_rate)
# Convert PCM to μ-law
alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
out_alaw_bytes = audioop.lin2alaw(in_pcm_bytes, 2)
return alaw_bytes
return out_alaw_bytes

View File

@@ -5,6 +5,7 @@
#
import time
from typing import Optional
import numpy as np
from loguru import logger
@@ -104,11 +105,8 @@ class SileroOnnxModel:
class SileroVADAnalyzer(VADAnalyzer):
def __init__(self, *, sample_rate: int = 16000, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
if sample_rate != 16000 and sample_rate != 8000:
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, params=params)
logger.debug("Loading Silero VAD model...")
@@ -138,6 +136,12 @@ class SileroVADAnalyzer(VADAnalyzer):
# VADAnalyzer
#
def set_sample_rate(self, sample_rate: int):
if sample_rate != 16000 and sample_rate != 8000:
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
super().set_sample_rate(sample_rate)
def num_frames_required(self) -> int:
return 512 if self.sample_rate == 16000 else 256

View File

@@ -6,6 +6,7 @@
from abc import abstractmethod
from enum import Enum
from typing import Optional
from loguru import logger
from pydantic import BaseModel
@@ -33,11 +34,11 @@ class VADParams(BaseModel):
class VADAnalyzer:
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
self._sample_rate = sample_rate
self._num_channels = num_channels
self.set_params(params)
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams):
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._params = params
self._num_channels = 1
self._vad_buffer = b""
@@ -65,13 +66,17 @@ class VADAnalyzer:
def voice_confidence(self, buffer) -> float:
pass
def set_sample_rate(self, sample_rate: int):
self._sample_rate = self._init_sample_rate or sample_rate
self.set_params(self._params)
def set_params(self, params: VADParams):
logger.info(f"Setting VAD params to: {params}")
self._params = params
self._vad_frames = self.num_frames_required()
self._vad_frames_num_bytes = self._vad_frames * self._num_channels * 2
vad_frames_per_sec = self._vad_frames / self._sample_rate
vad_frames_per_sec = self._vad_frames / self.sample_rate
self._vad_start_frames = round(self._params.start_secs / vad_frames_per_sec)
self._vad_stop_frames = round(self._params.stop_secs / vad_frames_per_sec)
@@ -80,7 +85,7 @@ class VADAnalyzer:
self._vad_state: VADState = VADState.QUIET
def _get_smoothed_volume(self, audio: bytes) -> float:
volume = calculate_audio_volume(audio, self._sample_rate)
volume = calculate_audio_volume(audio, self.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
def analyze_audio(self, buffer) -> VADState:

View File

@@ -6,7 +6,18 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Literal, Mapping, Optional, Tuple
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Mapping,
Optional,
Tuple,
)
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.clocks.base_clock import BaseClock
@@ -37,7 +48,7 @@ class KeypadEntry(str, Enum):
STAR = "*"
def format_pts(pts: int | None):
def format_pts(pts: Optional[int]):
return nanoseconds_to_str(pts) if pts else None
@@ -48,13 +59,13 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
metadata: dict = field(init=False)
metadata: Dict[str, Any] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.metadata: dict = {}
self.metadata: Dict[str, Any] = {}
def __str__(self):
return self.name
@@ -115,7 +126,7 @@ class ImageRawFrame:
image: bytes
size: Tuple[int, int]
format: str | None
format: Optional[str]
#
@@ -165,7 +176,7 @@ class URLImageRawFrame(OutputImageRawFrame):
"""
url: str | None
url: Optional[str]
def __str__(self):
pts = format_pts(self.pts)
@@ -224,7 +235,7 @@ class TranscriptionFrame(TextFrame):
user_id: str
timestamp: str
language: Language | None = None
language: Optional[Language] = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@@ -239,7 +250,7 @@ class InterimTranscriptionFrame(TextFrame):
text: str
user_id: str
timestamp: str
language: Language | None = None
language: Optional[Language] = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: [{self.text}], language: {self.language}, timestamp: {self.timestamp})"
@@ -261,7 +272,7 @@ class TranscriptionMessage:
role: Literal["user", "assistant"]
content: str
timestamp: str | None = None
timestamp: Optional[str] = None
@dataclass
@@ -428,11 +439,13 @@ class StartFrame(SystemFrame):
clock: BaseClock
task_manager: TaskManager
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
observer: Optional["BaseObserver"] = None
report_only_initial_ttfb: bool = False
@dataclass
@@ -661,7 +674,7 @@ class UserImageRawFrame(InputImageRawFrame):
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it."""
text: str | None
text: Optional[str]
def __str__(self):
pts = format_pts(self.pts)

View File

@@ -19,7 +19,7 @@ class PipelineRunner:
def __init__(
self,
*,
name: str | None = None,
name: Optional[str] = None,
handle_sigint: bool = True,
force_gc: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncIterable, Iterable, List
from typing import Any, AsyncIterable, Dict, Iterable, List
from loguru import logger
from pydantic import BaseModel, ConfigDict
@@ -38,24 +38,48 @@ HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5
class PipelineParams(BaseModel):
"""Configuration parameters for pipeline execution.
Attributes:
allow_interruptions: Whether to allow pipeline interruptions.
audio_in_sample_rate: Input audio sample rate in Hz.
audio_out_sample_rate: Output audio sample rate in Hz.
enable_heartbeats: Whether to enable heartbeat monitoring.
enable_metrics: Whether to enable metrics collection.
enable_usage_metrics: Whether to enable usage metrics.
heartbeats_period_secs: Period between heartbeats in seconds.
observers: List of observers for monitoring pipeline execution.
report_only_initial_ttfb: Whether to report only initial time to first byte.
send_initial_empty_metrics: Whether to send initial empty metrics.
start_metadata: Additional metadata for pipeline start.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
allow_interruptions: bool = False
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
enable_heartbeats: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
send_initial_empty_metrics: bool = True
report_only_initial_ttfb: bool = False
observers: List[BaseObserver] = []
heartbeats_period_secs: float = HEARTBEAT_SECONDS
observers: List[BaseObserver] = []
report_only_initial_ttfb: bool = False
send_initial_empty_metrics: bool = True
start_metadata: Dict[str, Any] = {}
class PipelineTaskSource(FrameProcessor):
"""This is the source processor that is linked at the beginning of the
"""Source processor for pipeline tasks that handles frame routing.
This is the source processor that is linked at the beginning of the
pipeline given to the pipeline task. It allows us to easily push frames
downstream to the pipeline and also receive upstream frames coming from the
pipeline.
Args:
up_queue: Queue for upstream frame processing.
"""
def __init__(self, up_queue: asyncio.Queue, **kwargs):
@@ -73,10 +97,14 @@ class PipelineTaskSource(FrameProcessor):
class PipelineTaskSink(FrameProcessor):
"""This is the sink processor that is linked at the end of the pipeline
"""Sink processor for pipeline tasks that handles final frame processing.
This is the sink processor that is linked at the end of the pipeline
given to the pipeline task. It allows us to receive downstream frames and
act on them, for example, waiting to receive an EndFrame.
Args:
down_queue: Queue for downstream frame processing.
"""
def __init__(self, down_queue: asyncio.Queue, **kwargs):
@@ -89,6 +117,14 @@ class PipelineTaskSink(FrameProcessor):
class PipelineTask(BaseTask):
"""Manages the execution of a pipeline, handling frame processing and task lifecycle.
Args:
pipeline: The pipeline to execute.
params: Configuration parameters for the pipeline.
clock: Clock implementation for timing operations.
"""
def __init__(
self,
pipeline: BasePipeline,
@@ -136,6 +172,11 @@ class PipelineTask(BaseTask):
"""Returns the name of this task."""
return self._name
@property
def params(self) -> PipelineParams:
"""Returns the pipeline parameters of this task."""
return self._params
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
self._task_manager.set_event_loop(loop)
@@ -155,9 +196,7 @@ class PipelineTask(BaseTask):
await self.queue_frame(EndFrame())
async def cancel(self):
"""
Stops the running pipeline immediately.
"""
"""Stops the running pipeline immediately."""
logger.debug(f"Canceling pipeline task {self}")
# Make sure everything is cleaned up downstream. This is sent
# out-of-band from the main streaming task which is what we want since
@@ -167,9 +206,7 @@ class PipelineTask(BaseTask):
await self._task_manager.cancel_task(self._process_push_task)
async def run(self):
"""
Starts running the given pipeline.
"""
"""Starts and manages the pipeline execution until completion or cancellation."""
if self.has_finished():
return
try:
@@ -187,14 +224,18 @@ class PipelineTask(BaseTask):
self._finished = True
async def queue_frame(self, frame: Frame):
"""
Queue a frame to be pushed down the pipeline.
"""Queue a single frame to be pushed down the pipeline.
Args:
frame: The frame to be processed.
"""
await self._push_queue.put(frame)
async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]):
"""
Queues multiple frames to be pushed down the pipeline.
"""Queues multiple frames to be pushed down the pipeline.
Args:
frames: An iterable or async iterable of frames to be processed.
"""
if isinstance(frames, AsyncIterable):
async for frame in frames:
@@ -271,11 +312,14 @@ class PipelineTask(BaseTask):
clock=self._clock,
task_manager=self._task_manager,
allow_interruptions=self._params.allow_interruptions,
audio_in_sample_rate=self._params.audio_in_sample_rate,
audio_out_sample_rate=self._params.audio_out_sample_rate,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_usage_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
observer=self._observer,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
)
start_frame.metadata = self._params.start_metadata
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
@@ -337,9 +381,7 @@ class PipelineTask(BaseTask):
self._down_queue.task_done()
async def _heartbeat_push_handler(self):
"""
This tasks pushes a heartbeat frame every heartbeat period.
"""
"""This tasks pushes a heartbeat frame every heartbeat period."""
while True:
# Don't use `queue_frame()` because if an EndFrame is queued the
# task will just stop waiting for the pipeline to finish not

View File

@@ -16,9 +16,10 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
"""
def __init__(self, notifier: BaseNotifier, **kwargs):
def __init__(self, *, notifier: BaseNotifier, start_open: bool = False, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._start_open = start_open
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -31,7 +32,11 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
await self._stop()
await self.push_frame(frame)
elif isinstance(frame, OpenAILLMContextFrame):
self._last_context_frame = frame
if self._start_open:
self._start_open = False
await self.push_frame(frame, direction)
else:
self._last_context_frame = frame
else:
await self.push_frame(frame, direction)

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List, Type
from typing import List, Optional, Type
from pipecat.frames.frames import (
Frame,
@@ -37,7 +37,7 @@ class LLMResponseAggregator(FrameProcessor):
start_frame,
end_frame,
accumulator_frame: Type[TextFrame],
interim_accumulator_frame: Type[TextFrame] | None = None,
interim_accumulator_frame: Optional[Type[TextFrame]] = None,
handle_interruptions: bool = False,
expect_stripped_words: bool = True, # if True, need to add spaces between words
):

View File

@@ -51,7 +51,7 @@ class CustomEncoder(json.JSONEncoder):
class OpenAILLMContext:
def __init__(
self,
messages: List[ChatCompletionMessageParam] | None = None,
messages: Optional[List[ChatCompletionMessageParam]] = None,
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN,
):

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional
from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
@@ -50,7 +52,7 @@ class ResponseAggregator(FrameProcessor):
start_frame,
end_frame,
accumulator_frame: TextFrame,
interim_accumulator_frame: TextFrame | None = None,
interim_accumulator_frame: Optional[TextFrame] = None,
):
super().__init__()

View File

@@ -30,7 +30,7 @@ class AsyncGeneratorProcessor(FrameProcessor):
if isinstance(frame, (CancelFrame, EndFrame)):
await self._data_queue.put(None)
else:
data = self._serializer.serialize(frame)
data = await self._serializer.serialize(frame)
if data:
await self._data_queue.put(data)

View File

@@ -4,12 +4,18 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
import time
from typing import Optional
from pipecat.audio.utils import create_default_resampler, interleave_stereo_audio, mix_audio
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -29,16 +35,29 @@ class AudioBufferProcessor(FrameProcessor):
"""
def __init__(
self, *, sample_rate: int = 24000, num_channels: int = 1, buffer_size: int = 0, **kwargs
self,
*,
sample_rate: Optional[int] = None,
num_channels: int = 1,
buffer_size: int = 0,
**kwargs,
):
super().__init__(**kwargs)
self._sample_rate = sample_rate
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._num_channels = num_channels
self._buffer_size = buffer_size
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
self._last_user_frame_at = 0
self._last_bot_frame_at = 0
self._recording = False
self._resampler = create_default_resampler()
self._register_event_handler("on_audio_data")
@property
@@ -64,43 +83,83 @@ class AudioBufferProcessor(FrameProcessor):
else:
return b""
def reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
async def start_recording(self):
self._recording = True
self._reset_recording()
async def stop_recording(self):
await self._call_on_audio_data_handler()
self._recording = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# Include all audio from the user.
if isinstance(frame, InputAudioRawFrame):
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
# Update output sample rate if necessary.
if isinstance(frame, StartFrame):
self._update_sample_rate(frame)
if self._recording and isinstance(frame, InputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_user_frame_at)
self._user_audio_buffer.extend(silence)
# Add user audio.
resampled = await self._resample_audio(frame)
self._user_audio_buffer.extend(resampled)
# Sync the bot's buffer to the user's buffer by adding silence if needed
if len(self._user_audio_buffer) > len(self._bot_audio_buffer):
silence = b"\x00" * len(resampled)
self._bot_audio_buffer.extend(silence)
# If the bot is speaking, include all audio from the bot.
elif isinstance(frame, OutputAudioRawFrame):
resampled = resample_audio(frame.audio, frame.sample_rate, self._sample_rate)
# Save time of frame so we can compute silence.
self._last_user_frame_at = time.time()
elif self._recording and isinstance(frame, OutputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_bot_frame_at)
self._bot_audio_buffer.extend(silence)
# Add bot audio.
resampled = await self._resample_audio(frame)
self._bot_audio_buffer.extend(resampled)
# Save time of frame so we can compute silence.
self._last_bot_frame_at = time.time()
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
await self._call_on_audio_data_handler()
if isinstance(frame, EndFrame):
await self._call_on_audio_data_handler()
if isinstance(frame, (CancelFrame, EndFrame)):
await self.stop_recording()
await self.push_frame(frame, direction)
def _update_sample_rate(self, frame: StartFrame):
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
async def _call_on_audio_data_handler(self):
if not self.has_audio():
if not self.has_audio() or not self._recording:
return
merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
)
self.reset_audio_buffers()
self._reset_audio_buffers()
def _buffer_has_audio(self, buffer: bytearray) -> bool:
return buffer is not None and len(buffer) > 0
def _reset_recording(self):
self._reset_audio_buffers()
self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()
def _reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)
def _compute_silence(self, from_time: float) -> bytes:
quiet_time = time.time() - from_time
# We should get audio frames very frequently. We pick 100ms because
# that's big enough, but it could be even a bit slower since we usually
# do 20ms audio frames.
if from_time == 0 or quiet_time < 0.1:
return b""
num_bytes = int(quiet_time * self._sample_rate) * 2
silence = b"\x00" * num_bytes
return silence

View File

@@ -4,6 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Optional
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
@@ -11,6 +13,7 @@ from pipecat.audio.vad.vad_analyzer import VADParams, VADState
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
@@ -23,7 +26,7 @@ class SileroVAD(FrameProcessor):
def __init__(
self,
*,
sample_rate: int = 16000,
sample_rate: Optional[int] = None,
vad_params: VADParams = VADParams(),
audio_passthrough: bool = False,
):
@@ -41,6 +44,9 @@ class SileroVAD(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
self._vad_analyzer.set_sample_rate(frame.audio_in_sample_rate)
if isinstance(frame, AudioRawFrame):
await self._analyze_audio(frame)
if self._audio_passthrough:

View File

@@ -245,6 +245,9 @@ class FrameProcessor:
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if not self._check_ready(frame):
return
if isinstance(frame, SystemFrame):
await self.__internal_push_frame(frame, direction)
else:
@@ -319,6 +322,16 @@ class FrameProcessor:
await self.push_error(ErrorFrame(str(e)))
raise
def _check_ready(self, frame: Frame):
# If we are trying to push a frame but we still have no clock, it means
# we didn't process a StartFrame.
if not self._clock:
logger.error(
f"{self} not properly initialized, missing super().process_frame(frame, direction)?"
)
return False
return True
def __create_input_task(self):
if not self.__input_frame_task:
self.__should_block_frames = False

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Union
from typing import Optional, Union
from loguru import logger
@@ -30,7 +30,7 @@ class LangchainProcessor(FrameProcessor):
super().__init__()
self._chain = chain
self._transcript_key = transcript_key
self._participant_id: str | None = None
self._participant_id: Optional[str] = None
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id

View File

@@ -753,7 +753,7 @@ class RTVIProcessor(FrameProcessor):
super().__init__(**kwargs)
self._config = config
self._pipeline: FrameProcessor | None = None
self._pipeline: Optional[FrameProcessor] = None
self._bot_ready = False
self._client_ready = False
@@ -999,7 +999,7 @@ class RTVIProcessor(FrameProcessor):
)
await self.push_frame(frame)
async def _handle_action(self, request_id: str | None, data: RTVIActionRun):
async def _handle_action(self, request_id: Optional[str], data: RTVIActionRun):
action_id = self._action_id(data.service, data.action)
if action_id not in self._registered_actions:
await self._send_error_response(request_id, f"Action {action_id} not registered")

View File

@@ -5,6 +5,7 @@
#
import asyncio
from typing import Optional
from loguru import logger
from pydantic import BaseModel
@@ -38,7 +39,7 @@ class GStreamerPipelineSource(FrameProcessor):
class OutputParams(BaseModel):
video_width: int = 1280
video_height: int = 720
audio_sample_rate: int = 24000
audio_sample_rate: Optional[int] = None
audio_channels: int = 1
clock_sync: bool = True
@@ -46,6 +47,7 @@ class GStreamerPipelineSource(FrameProcessor):
super().__init__(**kwargs)
self._out_params = out_params
self._sample_rate = 0
Gst.init()
@@ -90,6 +92,7 @@ class GStreamerPipelineSource(FrameProcessor):
await self.push_frame(frame, direction)
async def _start(self, frame: StartFrame):
self._sample_rate = self._out_params.audio_sample_rate or frame.audio_out_sample_rate
self._player.set_state(Gst.State.PLAYING)
async def _stop(self, frame: EndFrame):
@@ -122,7 +125,7 @@ class GStreamerPipelineSource(FrameProcessor):
audioresample = Gst.ElementFactory.make("audioresample", None)
audiocapsfilter = Gst.ElementFactory.make("capsfilter", None)
audiocaps = Gst.Caps.from_string(
f"audio/x-raw,format=S16LE,rate={self._out_params.audio_sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
f"audio/x-raw,format=S16LE,rate={self._sample_rate},channels={self._out_params.audio_channels},layout=interleaved"
)
audiocapsfilter.set_property("caps", audiocaps)
appsink_audio = Gst.ElementFactory.make("appsink", None)
@@ -188,7 +191,7 @@ class GStreamerPipelineSource(FrameProcessor):
(_, info) = buffer.map(Gst.MapFlags.READ)
frame = OutputAudioRawFrame(
audio=info.data,
sample_rate=self._out_params.audio_sample_rate,
sample_rate=self._sample_rate,
num_channels=self._out_params.audio_channels,
)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())

View File

@@ -4,19 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from loguru import logger
try:
import sentry_sdk
sentry_available = sentry_sdk.is_initialized()
if not sentry_available:
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
except ImportError:
sentry_available = False
logger.warning("Sentry SDK not installed. Sentry features will be disabled.")
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Sentry, you need to `pip install pipecat-ai[sentry]`.")
raise Exception(f"Missing module: {e}")
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
@@ -24,41 +19,44 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMet
class SentryMetrics(FrameProcessorMetrics):
def __init__(self):
super().__init__()
self._ttfb_metrics_span = None
self._processing_metrics_span = None
self._ttfb_metrics_tx = None
self._processing_metrics_tx = None
self._sentry_available = sentry_sdk.is_initialized()
if not self._sentry_available:
logger.warning("Sentry SDK not initialized. Sentry features will be disabled.")
async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
if sentry_available:
self._ttfb_metrics_span = sentry_sdk.start_span(
op="ttfb",
description=f"TTFB for {self._processor_name()}",
start_timestamp=self._start_ttfb_time,
)
logger.debug(
f"Sentry Span ID: {self._ttfb_metrics_span.span_id} Description: {self._ttfb_metrics_span.description} started."
)
self._should_report_ttfb = not report_only_initial_ttfb
await super().start_ttfb_metrics(report_only_initial_ttfb)
async def stop_ttfb_metrics(self):
stop_time = time.time()
if sentry_available:
self._ttfb_metrics_span.finish(end_timestamp=stop_time)
async def start_processing_metrics(self):
self._start_processing_time = time.time()
if sentry_available:
self._processing_metrics_span = sentry_sdk.start_span(
op="processing",
description=f"Processing for {self._processor_name()}",
start_timestamp=self._start_processing_time,
if self._should_report_ttfb and self._sentry_available:
self._ttfb_metrics_tx = sentry_sdk.start_transaction(
op="ttfb",
name=f"TTFB for {self._processor_name()}",
)
logger.debug(
f"Sentry Span ID: {self._processing_metrics_span.span_id} Description: {self._processing_metrics_span.description} started."
f"Sentry transaction started (ID: {self._ttfb_metrics_tx.span_id} Name: {self._ttfb_metrics_tx.name})"
)
async def stop_ttfb_metrics(self):
await super().stop_ttfb_metrics()
if self._sentry_available and self._ttfb_metrics_tx:
self._ttfb_metrics_tx.finish()
async def start_processing_metrics(self):
await super().start_processing_metrics()
if self._sentry_available:
self._processing_metrics_tx = sentry_sdk.start_transaction(
op="processing",
name=f"Processing for {self._processor_name()}",
)
logger.debug(
f"Sentry transaction started (ID: {self._processing_metrics_tx.span_id} Name: {self._processing_metrics_tx.name})"
)
async def stop_processing_metrics(self):
stop_time = time.time()
if sentry_available:
self._processing_metrics_span.finish(end_timestamp=stop_time)
await super().stop_processing_metrics()
if self._sentry_available and self._processing_metrics_tx:
self._processing_metrics_tx.finish()

View File

@@ -87,7 +87,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
"""Initialize processor with aggregation state."""
super().__init__(**kwargs)
self._current_text_parts: List[str] = []
self._aggregation_start_time: Optional[str] | None = None
self._aggregation_start_time: Optional[str] = None
async def _emit_aggregated_text(self):
"""Emit aggregated text as a transcript message."""

View File

@@ -7,7 +7,7 @@
from abc import ABC, abstractmethod
from enum import Enum
from pipecat.frames.frames import Frame
from pipecat.frames.frames import Frame, StartFrame
class FrameSerializerType(Enum):
@@ -21,10 +21,13 @@ class FrameSerializer(ABC):
def type(self) -> FrameSerializerType:
pass
@abstractmethod
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
pass
@abstractmethod
def deserialize(self, data: str | bytes) -> Frame | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
pass
@abstractmethod
async def deserialize(self, data: str | bytes) -> Frame | None:
pass

View File

@@ -25,7 +25,7 @@ class LivekitFrameSerializer(FrameSerializer):
def type(self) -> FrameSerializerType:
return FrameSerializerType.BINARY
def serialize(self, frame: Frame) -> str | bytes | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, OutputAudioRawFrame):
return None
audio_frame = AudioFrame(
@@ -36,7 +36,7 @@ class LivekitFrameSerializer(FrameSerializer):
)
return pickle.dumps(audio_frame)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
audio_frame: AudioFrame = pickle.loads(data)["frame"]
return InputAudioRawFrame(
audio=bytes(audio_frame.data),

View File

@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
def type(self) -> FrameSerializerType:
return FrameSerializerType.BINARY
def serialize(self, frame: Frame) -> str | bytes | None:
async def serialize(self, frame: Frame) -> str | bytes | None:
proto_frame = frame_protos.Frame()
if type(frame) not in self.SERIALIZABLE_TYPES:
logger.warning(f"Frame type {type(frame)} is not serializable")
@@ -57,26 +57,7 @@ class ProtobufFrameSerializer(FrameSerializer):
return proto_frame.SerializeToString()
def deserialize(self, data: str | bytes) -> Frame | None:
"""Returns a Frame object from a Frame protobuf.
Used to convert frames
passed over the wire as protobufs to Frame objects used in pipelines
and frame processors.
>>> serializer = ProtobufFrameSerializer()
>>> serializer.deserialize(
... serializer.serialize(OutputAudioFrame(data=b'1234567890')))
InputAudioFrame(data=b'1234567890')
>>> serializer.deserialize(
... serializer.serialize(TextFrame(text='hello world')))
TextFrame(text='hello world')
>>> serializer.deserialize(serializer.serialize(TranscriptionFrame(
... text="Hello there!", participantId="123", timestamp="2021-01-01")))
TranscriptionFrame(text='Hello there!', participantId='123', timestamp='2021-01-01')
"""
async def deserialize(self, data: str | bytes) -> Frame | None:
proto = frame_protos.Frame.FromString(data)
which = proto.WhichOneof("frame")
if which not in self.DESERIALIZABLE_FIELDS:

View File

@@ -6,16 +6,24 @@
import base64
import json
from typing import Optional
from pydantic import BaseModel
from pipecat.audio.utils import alaw_to_pcm, pcm_to_alaw, pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import (
alaw_to_pcm,
create_default_resampler,
pcm_to_alaw,
pcm_to_ulaw,
ulaw_to_pcm,
)
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartFrame,
StartInterruptionFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -23,8 +31,8 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TelnyxFrameSerializer(FrameSerializer):
class InputParams(BaseModel):
telnyx_sample_rate: int = 8000
sample_rate: int = 16000
telnyx_sample_rate: int = 8000 # Default Telnyx rate (8kHz)
sample_rate: Optional[int] = None # Pipeline input rate
inbound_encoding: str = "PCMU"
outbound_encoding: str = "PCMU"
@@ -40,21 +48,30 @@ class TelnyxFrameSerializer(FrameSerializer):
params.inbound_encoding = inbound_encoding
self._params = params
self._telnyx_sample_rate = self._params.telnyx_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, AudioRawFrame):
data = frame.audio
# Output: Convert PCM at frame's rate to 8kHz encoded for Telnyx
if self._params.inbound_encoding == "PCMU":
serialized_data = pcm_to_ulaw(
data, frame.sample_rate, self._params.telnyx_sample_rate
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
)
elif self._params.inbound_encoding == "PCMA":
serialized_data = pcm_to_alaw(
data, frame.sample_rate, self._params.telnyx_sample_rate
serialized_data = await pcm_to_alaw(
data, frame.sample_rate, self._telnyx_sample_rate, self._resampler
)
else:
raise ValueError(f"Unsupported encoding: {self._params.inbound_encoding}")
@@ -71,26 +88,33 @@ class TelnyxFrameSerializer(FrameSerializer):
answer = {"event": "clear"}
return json.dumps(answer)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if message["event"] == "media":
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
# Input: Convert Telnyx's 8kHz encoded audio to PCM at pipeline input rate
if self._params.outbound_encoding == "PCMU":
deserialized_data = ulaw_to_pcm(
payload, self._params.telnyx_sample_rate, self._params.sample_rate
deserialized_data = await ulaw_to_pcm(
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
)
elif self._params.outbound_encoding == "PCMA":
deserialized_data = alaw_to_pcm(
payload, self._params.telnyx_sample_rate, self._params.sample_rate
deserialized_data = await alaw_to_pcm(
payload,
self._telnyx_sample_rate,
self._sample_rate,
self._resampler,
)
else:
raise ValueError(f"Unsupported encoding: {self._params.outbound_encoding}")
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)
return audio_frame
elif message["event"] == "dtmf":

View File

@@ -6,16 +6,18 @@
import base64
import json
from typing import Optional
from pydantic import BaseModel
from pipecat.audio.utils import pcm_to_ulaw, ulaw_to_pcm
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
@@ -25,25 +27,36 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
class TwilioFrameSerializer(FrameSerializer):
class InputParams(BaseModel):
twilio_sample_rate: int = 8000
sample_rate: int = 16000
twilio_sample_rate: int = 8000 # Default Twilio rate (8kHz)
sample_rate: Optional[int] = None # Pipeline input rate
def __init__(self, stream_sid: str, params: InputParams = InputParams()):
self._stream_sid = stream_sid
self._params = params
self._twilio_sample_rate = self._params.twilio_sample_rate
self._sample_rate = 0 # Pipeline input rate
self._resampler = create_default_resampler()
@property
def type(self) -> FrameSerializerType:
return FrameSerializerType.TEXT
def serialize(self, frame: Frame) -> str | bytes | None:
async def setup(self, frame: StartFrame):
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
elif isinstance(frame, AudioRawFrame):
data = frame.audio
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
# Output: Convert PCM at frame's rate to 8kHz μ-law for Twilio
serialized_data = await pcm_to_ulaw(
data, frame.sample_rate, self._twilio_sample_rate, self._resampler
)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
@@ -55,18 +68,19 @@ class TwilioFrameSerializer(FrameSerializer):
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
return json.dumps(frame.message)
def deserialize(self, data: str | bytes) -> Frame | None:
async def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if message["event"] == "media":
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
deserialized_data = ulaw_to_pcm(
payload, self._params.twilio_sample_rate, self._params.sample_rate
# Input: Convert Twilio's 8kHz μ-law to PCM at pipeline input rate
deserialized_data = await ulaw_to_pcm(
payload, self._twilio_sample_rate, self._sample_rate, self._resampler
)
audio_frame = InputAudioRawFrame(
audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate
audio=deserialized_data, num_channels=1, sample_rate=self._sample_rate
)
return audio_frame
elif message["event"] == "dtmf":

View File

@@ -140,7 +140,7 @@ class LLMService(AIService):
self._start_callbacks = {}
# TODO-CB: callback function type
def register_function(self, function_name: str | None, callback, start_callback=None):
def register_function(self, function_name: Optional[str], callback, start_callback=None):
# Registering a function with the function_name set to None will run that callback
# for all functions
self._callbacks[function_name] = callback
@@ -148,7 +148,7 @@ class LLMService(AIService):
if start_callback:
self._start_callbacks[function_name] = start_callback
def unregister_function(self, function_name: str | None):
def unregister_function(self, function_name: Optional[str]):
del self._callbacks[function_name]
if self._start_callbacks[function_name]:
del self._start_callbacks[function_name]
@@ -190,7 +190,7 @@ class LLMService(AIService):
elif None in self._start_callbacks.keys():
return await self._start_callbacks[None](function_name, self, context)
async def request_image_frame(self, user_id: str, *, text_content: str | None = None):
async def request_image_frame(self, user_id: str, *, text_content: Optional[str] = None):
await self.push_frame(
UserImageRequestFrame(user_id=user_id, context=text_content), FrameDirection.UPSTREAM
)
@@ -213,7 +213,7 @@ class TTSService(AIService):
# if push_silence_after_stop is True, send this amount of audio silence
silence_time_s: float = 2.0,
# TTS output sample rate
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
text_filter: Optional[BaseTextFilter] = None,
**kwargs,
):
@@ -224,7 +224,8 @@ class TTSService(AIService):
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._push_silence_after_stop: bool = push_silence_after_stop
self._silence_time_s: float = silence_time_s
self._sample_rate: int = sample_rate
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._voice_id: str = ""
self._settings: Dict[str, Any] = {}
self._text_filter: Optional[BaseTextFilter] = text_filter
@@ -248,16 +249,20 @@ class TTSService(AIService):
async def flush_audio(self):
pass
def language_to_service_language(self, language: Language) -> str | None:
return Language(language)
# Converts the text to audio.
@abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
pass
def language_to_service_language(self, language: Language) -> Optional[str]:
return Language(language)
async def update_setting(self, key: str, value: Any):
pass
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._init_sample_rate or frame.audio_out_sample_rate
if self._push_stop_frames:
self._stop_frame_task = self.create_task(self._stop_frame_handler())
@@ -347,7 +352,7 @@ class TTSService(AIService):
await self.push_frame(frame, direction)
async def _process_text_frame(self, frame: TextFrame):
text: str | None = None
text: Optional[str] = None
if not self._aggregate_sentences:
text = frame.text
else:
@@ -467,9 +472,17 @@ class WordTTSService(TTSService):
class STTService(AIService):
"""STTService is a base class for speech-to-text services."""
def __init__(self, audio_passthrough=False, **kwargs):
def __init__(
self,
audio_passthrough=False,
# STT input sample rate
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
self._audio_passthrough = audio_passthrough
self._init_sample_rate = sample_rate
self._sample_rate = 0
self._settings: Dict[str, Any] = {}
self._muted: bool = False
@@ -478,6 +491,10 @@ class STTService(AIService):
"""Returns whether the STT service is currently muted."""
return self._muted
@property
def sample_rate(self) -> int:
return self._sample_rate
@abstractmethod
async def set_model(self, model: str):
self.set_model_name(model)
@@ -491,6 +508,10 @@ class STTService(AIService):
"""Returns transcript as a string"""
pass
async def start(self, frame: StartFrame):
await super().start(frame)
self._sample_rate = self._init_sample_rate or frame.audio_in_sample_rate
async def _update_settings(self, settings: Mapping[str, Any]):
logger.info(f"Updating STT settings: {self._settings}")
for key, value in settings.items():
@@ -540,17 +561,15 @@ class SegmentedSTTService(STTService):
min_volume: float = 0.6,
max_silence_secs: float = 0.3,
max_buffer_secs: float = 1.5,
sample_rate: int = 24000,
num_channels: int = 1,
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._min_volume = min_volume
self._max_silence_secs = max_silence_secs
self._max_buffer_secs = max_buffer_secs
self._sample_rate = sample_rate
self._num_channels = num_channels
(self._content, self._wave) = self._new_wave()
self._content = None
self._wave = None
self._silence_num_frames = 0
# Volume exponential smoothing
self._smoothing_factor = 0.2
@@ -569,8 +588,8 @@ class SegmentedSTTService(STTService):
# If buffer is not empty and we have enough data or there's been a long
# silence, transcribe the audio gathered so far.
silence_secs = self._silence_num_frames / self._sample_rate
buffer_secs = self._wave.getnframes() / self._sample_rate
silence_secs = self._silence_num_frames / self.sample_rate
buffer_secs = self._wave.getnframes() / self.sample_rate
if self._content.tell() > 0 and (
buffer_secs > self._max_buffer_secs or silence_secs > self._max_silence_secs
):
@@ -580,18 +599,24 @@ class SegmentedSTTService(STTService):
await self.process_generator(self.run_stt(self._content.read()))
(self._content, self._wave) = self._new_wave()
async def start(self, frame: StartFrame):
await super().start(frame)
(self._content, self._wave) = self._new_wave()
async def stop(self, frame: EndFrame):
await super().stop(frame)
self._wave.close()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
self._wave.close()
def _new_wave(self):
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(self._num_channels)
ww.setframerate(self._sample_rate)
ww.setnchannels(1)
ww.setframerate(self.sample_rate)
return (content, ww)
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:

View File

@@ -326,9 +326,9 @@ class AnthropicLLMService(LLMService):
class AnthropicLLMContext(OpenAILLMContext):
def __init__(
self,
messages: list[dict] | None = None,
tools: list[dict] | None = None,
tool_choice: dict | None = None,
messages: Optional[List[dict]] = None,
tools: Optional[List[dict]] = None,
tool_choice: Optional[dict] = None,
*,
system: Union[str, NotGiven] = NOT_GIVEN,
):

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -38,20 +38,17 @@ class AssemblyAISTTService(STTService):
self,
*,
api_key: str,
sample_rate: int = 16000,
sample_rate: Optional[int] = None,
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
language=Language.EN, # Only English is supported for Realtime
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
aai.settings.api_key = api_key
self._transcriber: aai.RealtimeTranscriber | None = None
# Store reference to the main event loop for use in callback functions
self._loop = asyncio.get_event_loop()
self._transcriber: Optional[aai.RealtimeTranscriber] = None
self._settings = {
"sample_rate": sample_rate,
"encoding": encoding,
"language": language,
}
@@ -121,7 +118,7 @@ class AssemblyAISTTService(STTService):
# Schedule the coroutine to run in the main event loop
# This is necessary because this callback runs in a different thread
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
def on_error(error: aai.RealtimeError):
"""Callback for handling errors from AssemblyAI.
@@ -131,14 +128,16 @@ class AssemblyAISTTService(STTService):
"""
logger.error(f"{self}: An error occurred: {error}")
# Schedule the coroutine to run in the main event loop
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
asyncio.run_coroutine_threadsafe(
self.push_frame(ErrorFrame(str(error))), self.get_event_loop()
)
def on_close():
"""Callback for when the connection to AssemblyAI is closed."""
logger.info(f"{self}: Disconnected from AssemblyAI")
self._transcriber = aai.RealtimeTranscriber(
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
encoding=self._settings["encoding"],
on_data=on_data,
on_error=on_error,

View File

@@ -10,7 +10,7 @@ from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -32,7 +32,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_aws_language(language: Language) -> str | None:
def language_to_aws_language(language: Language) -> Optional[str]:
language_map = {
# Arabic
Language.AR: "arb",
@@ -124,7 +124,7 @@ class PollyTTSService(TTSService):
aws_session_token: Optional[str] = None,
region: Optional[str] = None,
voice_id: str = "Joanna",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -138,7 +138,6 @@ class PollyTTSService(TTSService):
region_name=region,
)
self._settings = {
"sample_rate": sample_rate,
"engine": params.engine,
"language": self.language_to_service_language(params.language)
if params.language
@@ -148,12 +147,14 @@ class PollyTTSService(TTSService):
"volume": params.volume,
}
self._resampler = create_default_resampler()
self.set_voice(voice_id)
def can_generate_metrics(self) -> bool:
return True
def language_to_service_language(self, language: Language) -> str | None:
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_aws_language(language)
def _construct_ssml(self, text: str) -> str:
@@ -193,8 +194,7 @@ class PollyTTSService(TTSService):
response = self._polly_client.synthesize_speech(**args)
if "AudioStream" in response:
audio_data = response["AudioStream"].read()
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
return resampled
return audio_data
return None
logger.debug(f"Generating TTS: [{text}]")
@@ -225,6 +225,8 @@ class PollyTTSService(TTSService):
yield None
return
audio_data = await self._resampler.resample(audio_data, 16000, self.sample_rate)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
@@ -234,7 +236,7 @@ class PollyTTSService(TTSService):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -57,7 +57,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_azure_language(language: Language) -> str | None:
def language_to_azure_language(language: Language) -> Optional[str]:
language_map = {
# Afrikaans
Language.AF: "af-ZA",
@@ -450,14 +450,13 @@ class AzureBaseTTSService(TTSService):
api_key: str,
region: str,
voice="en-US-SaraNeural",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
"emphasis": params.emphasis,
"language": self.language_to_service_language(params.language)
if params.language
@@ -478,7 +477,7 @@ class AzureBaseTTSService(TTSService):
def can_generate_metrics(self) -> bool:
return True
def language_to_service_language(self, language: Language) -> str | None:
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_azure_language(language)
def _construct_ssml(self, text: str) -> str:
@@ -530,25 +529,32 @@ class AzureBaseTTSService(TTSService):
class AzureTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._speech_config = None
self._speech_synthesizer = None
self._audio_queue = asyncio.Queue()
speech_config = SpeechConfig(
async def start(self, frame: StartFrame):
await super().start(frame)
# Now self.sample_rate is properly initialized
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
speech_config.set_service_property(
self._speech_config.set_service_property(
"synthesizer.synthesis.connection.synthesisConnectionImpl",
"websocket",
ServicePropertyChannel.UriQueryParameter,
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
# Set up event handlers
self._audio_queue = asyncio.Queue()
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
@@ -591,7 +597,7 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSAudioRawFrame(
audio=chunk,
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
@@ -605,17 +611,22 @@ class AzureTTSService(AzureBaseTTSService):
class AzureHttpTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._speech_config = None
self._speech_synthesizer = None
speech_config = SpeechConfig(
async def start(self, frame: StartFrame):
await super().start(frame)
self._speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
self._speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self.sample_rate)
)
self._speech_synthesizer = SpeechSynthesizer(
speech_config=self._speech_config, audio_config=None
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -633,7 +644,7 @@ class AzureHttpTTSService(AzureBaseTTSService):
# Azure always sends a 44-byte header. Strip it off.
yield TTSAudioRawFrame(
audio=result.audio_data[44:],
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
yield TTSStoppedFrame()
@@ -650,24 +661,14 @@ class AzureSTTService(STTService):
*,
api_key: str,
region: str,
language=Language.EN_US,
sample_rate=24000,
channels=1,
language: Language = Language.EN_US,
sample_rate: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
speech_config = SpeechConfig(subscription=api_key, region=region)
speech_config.speech_recognition_language = language
stream_format = AudioStreamFormat(samples_per_second=sample_rate, channels=channels)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=speech_config, audio_config=audio_config
)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_config = SpeechConfig(subscription=api_key, region=region)
self._speech_config.speech_recognition_language = language
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
@@ -677,6 +678,16 @@ class AzureSTTService(STTService):
async def start(self, frame: StartFrame):
await super().start(frame)
stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=self._speech_config, audio_config=audio_config
)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
async def stop(self, frame: EndFrame):

View File

@@ -9,7 +9,7 @@ import os
import uuid
import wave
from datetime import datetime
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple
import aiohttp
from loguru import logger
@@ -69,7 +69,7 @@ class CanonicalMetricsService(AIService):
api_url: str = "https://voiceapp.canonical.chat/api/v1",
assistant_speaks_first: bool = True,
output_dir: str = "recordings",
context: OpenAILLMContext | None = None,
context: Optional[OpenAILLMContext] = None,
**kwargs,
):
super().__init__(**kwargs)
@@ -117,7 +117,6 @@ class CanonicalMetricsService(AIService):
try:
await self._multipart_upload(filename)
await aiofiles.os.remove(filename)
audio_buffer_processor.reset_audio_buffers()
except FileNotFoundError:
pass
except Exception as e:

View File

@@ -43,7 +43,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
def language_to_cartesia_language(language: Language) -> str | None:
def language_to_cartesia_language(language: Language) -> Optional[str]:
BASE_LANGUAGES = {
Language.DE: "de",
Language.EN: "en",
@@ -89,7 +89,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
cartesia_version: str = "2024-06-10",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
@@ -121,7 +121,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -143,7 +143,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
await super().set_model(model)
logger.info(f"Switching TTS model to: [{model}]")
def language_to_service_language(self, language: Language) -> str | None:
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_cartesia_language(language)
def _build_msg(
@@ -174,6 +174,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):
@@ -262,7 +263,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
self.start_word_timestamps()
frame = TTSAudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self._settings["output_format"]["sample_rate"],
sample_rate=self.sample_rate,
num_channels=1,
)
await self.push_frame(frame)
@@ -328,7 +329,7 @@ class CartesiaHttpTTSService(TTSService):
voice_id: str,
model: str = "sonic",
base_url: str = "https://api.cartesia.ai",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
@@ -341,7 +342,7 @@ class CartesiaHttpTTSService(TTSService):
"output_format": {
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
"sample_rate": 0,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -357,9 +358,13 @@ class CartesiaHttpTTSService(TTSService):
def can_generate_metrics(self) -> bool:
return True
def language_to_service_language(self, language: Language) -> str | None:
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_cartesia_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.close()
@@ -394,9 +399,7 @@ class CartesiaHttpTTSService(TTSService):
)
frame = TTSAudioRawFrame(
audio=output["audio"],
sample_rate=self._settings["output_format"]["sample_rate"],
num_channels=1,
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
)
yield frame
except Exception as e:

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional
from loguru import logger
@@ -53,14 +53,13 @@ class DeepgramTTSService(TTSService):
*,
api_key: str,
voice: str = "aura-helios-en",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
encoding: str = "linear16",
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._settings = {
"sample_rate": sample_rate,
"encoding": encoding,
}
self.set_voice(voice)
@@ -75,7 +74,7 @@ class DeepgramTTSService(TTSService):
options = SpeakOptions(
model=self._voice_id,
encoding=self._settings["encoding"],
sample_rate=self._settings["sample_rate"],
sample_rate=self.sample_rate,
container="none",
)
@@ -103,9 +102,7 @@ class DeepgramTTSService(TTSService):
chunk = audio_buffer.read(chunk_size)
if not chunk:
break
frame = TTSAudioRawFrame(
audio=chunk, sample_rate=self._settings["sample_rate"], num_channels=1
)
frame = TTSAudioRawFrame(audio=chunk, sample_rate=self.sample_rate, num_channels=1)
yield frame
yield TTSStoppedFrame()
@@ -121,15 +118,16 @@ class DeepgramSTTService(STTService):
*,
api_key: str,
url: str = "",
live_options: LiveOptions = None,
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-2-general",
sample_rate=16000,
channels=1,
interim_results=True,
smart_format=True,
@@ -187,6 +185,7 @@ class DeepgramSTTService(STTService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):

View File

@@ -55,7 +55,7 @@ ELEVENLABS_MULTILINGUAL_MODELS = {
}
def language_to_elevenlabs_language(language: Language) -> str | None:
def language_to_elevenlabs_language(language: Language) -> Optional[str]:
BASE_LANGUAGES = {
Language.AR: "ar",
Language.BG: "bg",
@@ -104,17 +104,20 @@ def language_to_elevenlabs_language(language: Language) -> str | None:
return result
def sample_rate_from_output_format(output_format: str) -> int:
match output_format:
case "pcm_16000":
return 16000
case "pcm_22050":
return 22050
case "pcm_24000":
return 24000
case "pcm_44100":
return 44100
return 16000
def output_format_from_sample_rate(sample_rate: int) -> str:
match sample_rate:
case 16000:
return "pcm_16000"
case 22050:
return "pcm_22050"
case 24000:
return "pcm_24000"
case 44100:
return "pcm_44100"
logger.warning(
f"ElevenLabsTTSService: No output format available for {sample_rate} sample rate"
)
return "pcm_16000"
def calculate_word_times(
@@ -165,7 +168,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
voice_id: str,
model: str = "eleven_flash_v2_5",
url: str = "wss://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -189,7 +192,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
sample_rate=sample_rate_from_output_format(output_format),
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
@@ -197,11 +200,9 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
self._api_key = api_key
self._url = url
self._settings = {
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else None,
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
@@ -211,6 +212,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
# Indicates if we have sent TTSStartedFrame. It will reset to False when
@@ -221,7 +223,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
def can_generate_metrics(self) -> bool:
return True
def language_to_service_language(self, language: Language) -> str | None:
def language_to_service_language(self, language: Language) -> Optional[str]:
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
@@ -254,7 +256,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
await self._disconnect()
await self._connect()
async def _update_settings(self, settings: Dict[str, Any]):
async def _update_settings(self, settings: Mapping[str, Any]):
prev_voice = self._voice_id
await super()._update_settings(settings)
if not prev_voice == self._voice_id:
@@ -264,6 +266,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
await self._connect()
async def stop(self, frame: EndFrame):
@@ -322,7 +325,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
voice_id = self._voice_id
model = self.model_name
output_format = self._settings["output_format"]
output_format = self._output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}&auto_mode={self._settings['auto_mode']}"
if self._settings["optimize_streaming_latency"]:
@@ -375,7 +378,7 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self._settings["sample_rate"], 1)
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
await self.push_frame(frame)
if msg.get("alignment"):
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
@@ -428,7 +431,7 @@ class ElevenLabsHttpTTSService(TTSService):
aiohttp_session: aiohttp ClientSession
model: Model ID (default: "eleven_flash_v2_5" for low latency)
base_url: API base URL
output_format: Audio output format (PCM)
sample_rate: Output sample rate
params: Additional parameters for voice configuration
"""
@@ -448,24 +451,21 @@ class ElevenLabsHttpTTSService(TTSService):
aiohttp_session: aiohttp.ClientSession,
model: str = "eleven_flash_v2_5",
base_url: str = "https://api.elevenlabs.io",
output_format: ElevenLabsOutputFormat = "pcm_24000",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate_from_output_format(output_format), **kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = base_url
self._output_format = output_format
self._params = params
self._session = aiohttp_session
self._settings = {
"sample_rate": sample_rate_from_output_format(output_format),
"language": self.language_to_service_language(params.language)
if params.language
else None,
"output_format": output_format,
"optimize_streaming_latency": params.optimize_streaming_latency,
"stability": params.stability,
"similarity_boost": params.similarity_boost,
@@ -474,6 +474,7 @@ class ElevenLabsHttpTTSService(TTSService):
}
self.set_model_name(model)
self.set_voice(voice_id)
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
def can_generate_metrics(self) -> bool:
@@ -508,6 +509,10 @@ class ElevenLabsHttpTTSService(TTSService):
return voice_settings or None
async def start(self, frame: StartFrame):
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using ElevenLabs streaming API.
@@ -570,7 +575,7 @@ class ElevenLabsHttpTTSService(TTSService):
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield TTSStoppedFrame()

View File

@@ -42,7 +42,7 @@ class FalImageGenService(ImageGenService):
params: InputParams,
aiohttp_session: aiohttp.ClientSession,
model: str = "fal-ai/fast-sdxl",
key: str | None = None,
key: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)

View File

@@ -56,7 +56,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
api_key: str,
model: str, # This is the reference_id
output_format: FishAudioOutputFormat = "pcm",
sample_rate: int = 24000,
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
@@ -70,7 +70,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
self._started = False
self._settings = {
"sample_rate": sample_rate,
"sample_rate": 0,
"latency": params.latency,
"format": output_format,
"prosody": {
@@ -92,6 +92,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
async def start(self, frame: StartFrame):
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):
@@ -157,9 +158,7 @@ class FishAudioTTSService(TTSService, WebsocketService):
audio_data = msg.get("audio")
# Only process larger chunks to remove msgpack overhead
if audio_data and len(audio_data) > 1024:
frame = TTSAudioRawFrame(
audio_data, self._settings["sample_rate"], 1
)
frame = TTSAudioRawFrame(audio_data, self.sample_rate, 1)
await self.push_frame(frame)
await self.stop_ttfb_metrics()
continue

View File

@@ -48,7 +48,7 @@ class AudioInputMessage(BaseModel):
realtimeInput: RealtimeInput
@classmethod
def from_raw_audio(cls, raw_audio: bytes, sample_rate=16000) -> "AudioInputMessage":
def from_raw_audio(cls, raw_audio: bytes, sample_rate: int) -> "AudioInputMessage":
data = base64.b64encode(raw_audio).decode("utf-8")
return cls(
realtimeInput=RealtimeInput(

View File

@@ -203,6 +203,8 @@ class GeminiMultimodalLiveLLMService(LLMService):
self._bot_audio_buffer = bytearray()
self._bot_text_buffer = ""
self._sample_rate = 24000
self._settings = {
"frequency_penalty": params.frequency_penalty,
"max_tokens": params.max_tokens,
@@ -521,7 +523,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
if self._audio_input_paused:
return
# Send all audio to Gemini
evt = events.AudioInputMessage.from_raw_audio(frame.audio)
evt = events.AudioInputMessage.from_raw_audio(frame.audio, frame.sample_rate)
await self.send_client_event(evt)
# Manage a buffer of audio to use for transcription
audio = frame.audio
@@ -650,7 +652,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
inline_data = part.inlineData
if not inline_data:
return
if inline_data.mimeType != "audio/pcm;rate=24000":
if inline_data.mimeType != f"audio/pcm;rate={self._sample_rate}":
logger.warning(f"Unrecognized server_content format {inline_data.mimeType}")
return
@@ -665,7 +667,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
self._bot_audio_buffer.extend(audio)
frame = TTSAudioRawFrame(
audio=audio,
sample_rate=24000,
sample_rate=self._sample_rate,
num_channels=1,
)
await self.push_frame(frame)

Some files were not shown because too many files have changed in this diff Show More