Compare commits

...

76 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
81c2c5adfa working on a fastapi http transport 2024-09-17 09:52:19 -07:00
Aleix Conchillo Flaqué
d9d6571c73 Merge pull request #465 from kunal-cai/ks--fix-ws
[Cartesia] Fix streaming truncation bug with Twilio Fast API WS
2024-09-16 17:17:13 -07:00
Kunal Shah
540cad4844 Undo sorting 2024-09-16 16:07:19 -07:00
Kunal Shah
0a26b650c0 Undo sorting 2024-09-16 16:06:25 -07:00
Kunal Shah
adaac003e5 [Cartesia] Fix streaming truncation bug with Twilio Fast API WS 2024-09-16 15:59:06 -07:00
Aleix Conchillo Flaqué
3d4f125071 Merge pull request #454 from pipecat-ai/aleix/initial-pipeline-clock-support
initial pipeline clock support
2024-09-13 13:51:04 -07:00
Aleix Conchillo Flaqué
bce87f8717 update CHANGELOG.md 2024-09-13 13:50:03 -07:00
Aleix Conchillo Flaqué
1fe940bd6b servceis(cartesia,elevenlabs): use word start times instead 2024-09-13 13:10:44 -07:00
Aleix Conchillo Flaqué
cb36a71381 fix some linting 2024-09-13 09:56:12 -07:00
Aleix Conchillo Flaqué
5acc4928fe examples: add 07d-interruptible-elevenlabs.py 2024-09-13 09:43:18 -07:00
Aleix Conchillo Flaqué
434493b8aa services(elevenlabs): implement word-by-word support through websockets 2024-09-13 09:31:35 -07:00
Aleix Conchillo Flaqué
f08b25dbb2 examples: assistant aggregator should always goes after transport 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
3665734972 transports(output): initial sink clock synchronization 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
a98d78cdea services(lmnt): change to subclass of AsyncTTSService 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
80f6d74e80 services(cartesia): change to subclass of AsyncWordTTSService 2024-09-12 00:37:34 -07:00
Aleix Conchillo Flaqué
02d926e9bd services: create AsyncTTSService and AsyncWordTTSService 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
7749692f72 processors: get pipeline clock from StartFrame 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
7807cbeb39 pipeline(task): add a clock to the pipeline task 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
72f231b327 frames: add a presentation timestamp (pts) to each frame 2024-09-12 00:31:48 -07:00
Aleix Conchillo Flaqué
3cbe97d346 clocks: added new BaseClock and SystemClock 2024-09-12 00:31:48 -07:00
Kwindla Hultman Kramer
b880e1a60e Merge pull request #448 from pipecat-ai/khk/aggregation-leading-space
fix for leading space in context aggregator strings
2024-09-10 09:57:35 -07:00
Aleix Conchillo Flaqué
886046e696 Merge pull request #445 from dleybz/patch-1
Update requirements.txt
2024-09-09 17:54:33 -07:00
Aleix Conchillo Flaqué
9106a5f8ae Merge pull request #449 from pipecat-ai/aleix/audio-out-bitrate
transports(daily): allow setting audio output bitrate (default 96kpbs)
2024-09-09 08:39:06 -07:00
Aleix Conchillo Flaqué
98286336bf transports(daily): allow setting audio output bitrate (default 96kpbs)
Fixes #388
2024-09-08 19:39:17 -07:00
Kwindla Hultman Kramer
081b001c8b fix for leading space in context aggregator strings 2024-09-07 16:42:52 -07:00
Danny D. Leybzon
c92531a02f Update requirements.txt
request.form() throws an error if you don't have python-multipart installed
2024-09-06 20:22:18 +02:00
Aleix Conchillo Flaqué
748a7af602 update CHANGELOG.md 2024-09-05 19:05:29 -07:00
Aleix Conchillo Flaqué
f4a0de6327 Merge pull request #444 from pipecat-ai/aleix/elevenlabs-streaming
services(elevenlabs): add elevenlabs package and use streaming
2024-09-05 11:24:12 -07:00
Aleix Conchillo Flaqué
e405d7af9f services(elevenlabs): add elevenlabs package and use streaming 2024-09-05 11:20:01 -07:00
Aashraya
51cd7fd285 twiliohandle interruption (#422)
* add interuption handler in twilio serializer

* fix autopep8

* revert ruff autoformatting

* address pr comments

* change interruption frame to user started frame in serializer

* remove overrrident handle interrupt

* remove unused import

* change userstarted to interuption frame
2024-09-02 11:06:38 -07:00
Aleix Conchillo Flaqué
aba5f89174 Merge pull request #437 from soof-golan/soof-obj-id-generation
Generate ids with itertools.count
2024-09-02 10:53:48 -07:00
Soof Golan
5c0f5a1613 Generate ids with itertools.count
Avoids the critical section with threading.Lock in favor of itertools.count.

`count` objects are threadsafe, and their critical section is implemented in C and provide better performance that Python level locking.
2024-09-02 15:39:58 +02:00
Aleix Conchillo Flaqué
7c342f7ba2 Merge pull request #433 from pipecat-ai/aleix/process-all-startframes
StartFrame should be the first frame every processor receives
2024-08-30 14:17:38 -07:00
Aleix Conchillo Flaqué
37e2388758 StartFrame should be the first frame every processor receives
Fixes #427
2024-08-29 22:43:44 -07:00
Aleix Conchillo Flaqué
05f0492a8d Merge pull request #421 from pipecat-ai/aleix/improve-multi-lingual-support
improve multi lingual support
2024-08-29 13:19:40 -07:00
Aleix Conchillo Flaqué
c0ac5c6ae8 services(lmnt): fix example and update README and CHANGELOG 2024-08-29 11:11:24 -07:00
Aleix Conchillo Flaqué
be923687fb processors(rtvi): user decices if bot interrupts on update config 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
5f32fb125d updated CHANGELOG.md 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
ae6fbb3146 services: just set model, voice, language independently 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
864768635a services: add voice and language to set_model() 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
d7c9679977 services: allow TTSModelUpdateFrame to also update language and voice 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
fedfc366f6 services(deepgram): fix strenum values 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
b3b39626e1 services: allow switching STT language and mdoel at the same time 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
4e0ece17b6 services: added support for setting STT model and language 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
fd3fdacdee transcriptions: added more languages 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
a253606d50 services(daily): on_joined now returns all data not only participant 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
568d9dc0a3 services(whisper): inherit from SegmentedSTTService 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
6629b853c5 services(deepgram): inherit from STTService instead of AsyncAIService 2024-08-29 11:00:03 -07:00
Aleix Conchillo Flaqué
3931cb3235 services(cartesia): allow setting language and language voices 2024-08-29 11:00:01 -07:00
Aleix Conchillo Flaqué
38cd86ad52 services: added language to transcription frames 2024-08-29 10:59:02 -07:00
Aleix Conchillo Flaqué
c0cdabf61d frames: adde TTSLanguageUpdateFrame and TTSLanguageVoicesUpdateFrame 2024-08-29 10:59:02 -07:00
Aleix Conchillo Flaqué
51270a96c5 frames: add language to transcription frames 2024-08-29 10:59:02 -07:00
Kwindla Hultman Kramer
84d72c0d5c Merge pull request #425 from pipecat-ai/khk/rtvi-together-function-calling
fixup type mismatches between rtvi data structures and together.py
2024-08-28 13:11:52 -07:00
Aleix Conchillo Flaqué
79aca8169a Merge pull request #391 from sharvil/pr/add-lmnt
LMNT TTS
2024-08-27 21:40:46 -07:00
Kwindla Hultman Kramer
b9d362bd62 fixup type mismatches between rtvi data structures and together.py 2024-08-27 17:39:21 -07:00
Sharvil Nanavati
87c4a1bee1 Move stop frame task creation into TTSService.start 2024-08-27 04:45:21 +00:00
Sharvil Nanavati
c979762b70 Handle cancellation, stopping, and restarting 2024-08-27 01:24:00 +00:00
Sharvil Nanavati
1d92fc3199 Merge branch 'main' into pr/add-lmnt 2024-08-24 10:07:52 -07:00
Sharvil Nanavati
8ac7fb1a67 Use a single long-lived Task to push TTSStoppedFrame 2024-08-24 16:18:07 +00:00
Sharvil Nanavati
60c3d33def Default LMNT to 24kHz, add example 2024-08-24 15:40:29 +00:00
Sharvil Nanavati
8a39d3f4eb services: add a generic mechanism to produce TTSStoppedFrames 2024-08-24 15:40:12 +00:00
Aleix Conchillo Flaqué
e038767b6f Merge pull request #413 from pipecat-ai/aleix/pipecat-0.0.41
prepare pipecat 0.0.41
2024-08-22 17:01:43 -07:00
Aleix Conchillo Flaqué
0c46b3e481 prepare pipecat 0.0.41 2024-08-22 11:50:20 -07:00
Aleix Conchillo Flaqué
d42f072ff5 examples: fix studypal errors and update requirements 2024-08-22 11:50:05 -07:00
Aleix Conchillo Flaqué
9b6f29c24a Merge pull request #414 from pipecat-ai/aleix/add-livekit-dependency
added livekit dependency
2024-08-22 10:55:43 -07:00
Aleix Conchillo Flaqué
873d5dc23f added livekit dependency 2024-08-22 10:54:18 -07:00
Aleix Conchillo Flaqué
6d141fd47f Merge pull request #396 from nulyang/feat/livekit-serializers
Add livekit audio serializers
2024-08-22 10:44:24 -07:00
Aleix Conchillo Flaqué
c6f6cb2947 Merge pull request #412 from pipecat-ai/aleix/fastapi-variable-clash
transports(fastapi): fix variable name clash
2024-08-22 09:50:23 -07:00
Aleix Conchillo Flaqué
0eb189ce7f transports(fastapi): fix variable name clash 2024-08-22 08:50:03 -07:00
Sharvil Nanavati
f4fd7b7028 LMNT TTS 2024-08-22 00:47:41 +00:00
Aleix Conchillo Flaqué
21de8e0a35 transport(out): log bot started/stopped speaking 2024-08-21 17:23:44 -07:00
Aleix Conchillo Flaqué
6f55d494bd frames: use VADParams type in VADParamsUpdateFrame 2024-08-21 17:23:12 -07:00
Aleix Conchillo Flaqué
d216edc567 Merge pull request #409 from aashsach/anthropic-empty-tool-argument
handle empty parameters for anthropic function calling
2024-08-21 16:14:51 -07:00
Aashraya
ec6063ecc4 system is not a list, it is handled and assisgned as string 2024-08-21 16:31:50 +05:30
Aashraya
40fe4ce6fb handle empty parameters for anthropic function calling 2024-08-21 15:49:36 +05:30
nulyang
9bda09b1a8 serializers(livekit): Add audio serializers 2024-08-18 23:40:32 +08:00
49 changed files with 1655 additions and 476 deletions

View File

@@ -5,6 +5,86 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- A clock can now be specified to `PipelineTask` (defaults to
`SystemClock`). This clock will be passed to each frame processor via the
`StartFrame`.
- Added pipeline clocks. A pipeline clock is used by the output transport to
know when a frame needs to be presented. For that, all frames now have an
optional `pts` field (prensentation timestamp). There's currently just one
clock implementation `SystemClock` and the `pts` field is currently only used
for `TextFrame`s (audio and image frames will be next).
- `DailyTransport` now supports setting the audio bitrate to improve audio
quality through the `DailyParams.audio_out_bitrate` parameter. The new
default is 96kbps.
- `DailyTransport` now uses the number of audio output channels (1 or 2) to set
mono or stereo audio when needed.
- Interruptions support has been added to `TwilioFrameSerializer` when using
`FastAPIWebsocketTransport`.
- Added new `LmntTTSService` text-to-speech service.
(see https://www.lmnt.com/)
- Added `TTSModelUpdateFrame`, `TTSLanguageUpdateFrame`, `STTModelUpdateFrame`,
and `STTLanguageUpdateFrame` frames to allow you to switch models, language
and voices in TTS and STT services.
- Added new `transcriptions.Language` enum.
### Changed
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
timestamps to their text output. This allows the output transport to push the
text frames downstream at almost the same time the words are spoken. We say
"almost" because currently the audio frames don't have presentation timestamp
but they should be played at roughly the same time.
- `DailyTransport.on_joined` event now returns the full session data instead of
just the participant.
- `CartesiaTTSService` is now a subclass of `TTSService`.
- `DeepgramSTTService` is now a subclass of `STTService`.
- `WhisperSTTService` is now a subclass of `SegmentedSTTService`. A
`SegmentedSTTService` is a `STTService` where the provided audio is given in a
big chunk (i.e. from when the user starts speaking until the user stops
speaking) instead of a continous stream.
### Fixed
- `StartFrame` should be the first frame every processor receives to avoid
situations where things are not initialized (because initialization happens on
`StartFrame`) and other frames come in resulting in undesired behavior.
### Performance
- `obj_id()` and `obj_count()` now use `itertools.count` avoiding the need of
`threading.Lock`.
## [0.0.41] - 2024-08-22
### Added
- Added `LivekitFrameSerializer` audio frame serializer.
### Fixed
- Fix `FastAPIWebsocketOutputTransport` variable name clash with subclass.
- Fix an `AnthropicLLMService` issue with empty arguments in function calling.
### Other
- Fixed `studypal` example errors.
## [0.0.40] - 2024-08-20
### Added

View File

@@ -4,8 +4,7 @@
# Pipecat
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Discord](https://img.shields.io/discord/1239284677165056021
)](https://discord.gg/pipecat)
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
`pipecat` is a framework for building voice (and multimodal) conversational agents. Things like personal coaches, meeting assistants, [story-telling toys for kids](https://storytelling-chatbot.fly.dev/), customer support bots, [intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0), and snarky social companions.
@@ -39,7 +38,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples

View File

@@ -30,6 +30,10 @@ FIREWORKS_API_KEY=...
# Gladia
GLADIA_API_KEY=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...

View File

@@ -1,5 +1,4 @@
import asyncio
import aiohttp
import os
import sys
import argparse
@@ -27,71 +26,69 @@ daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,5 +1,4 @@
import asyncio
import aiohttp
import os
import sys
import argparse
@@ -29,75 +28,74 @@ daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str, callId: str, callDomain: str):
async with aiohttp.ClientSession() as session:
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
diallin_settings = DailyDialinSettings(
call_id=callId,
call_domain=callDomain
)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=diallin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=diallin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
},
]
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,5 +1,4 @@
import asyncio
import aiohttp
import os
import sys
import argparse
@@ -36,82 +35,81 @@ daily_api_key = os.getenv("DAILY_API_KEY", "")
async def main(room_url: str, token: str, callId: str, sipUri: str):
async with aiohttp.ClientSession() as session:
# diallin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_key=daily_api_key,
dialin_settings=None, # Not required for Twilio
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_key=daily_api_key,
dialin_settings=None, # Not required for Twilio
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):
# For Twilio, Telnyx, etc. You need to update the state of the call
# and forward it to the sip_uri..
print(f"Forwarding call: {callId} {sipUri}")
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
)
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):
# For Twilio, Telnyx, etc. You need to update the state of the call
# and forward it to the sip_uri..
print(f"Forwarding call: {callId} {sipUri}")
try:
# The TwiML is updated using Twilio's client library
call = twilioclient.calls(callId).update(
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
)
except Exception as e:
raise Exception(f"Failed to forward call: {str(e)}")
runner = PipelineRunner()
await runner.run(task)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":

View File

@@ -3,3 +3,4 @@ fastapi
uvicorn
python-dotenv
twilio
python-multipart

View File

@@ -89,7 +89,6 @@ async def main():
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

View File

@@ -85,7 +85,6 @@ async def main():
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))

View File

@@ -79,7 +79,6 @@ async def main():
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

View File

@@ -18,7 +18,6 @@ from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frameworks.langchain import LangchainProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

View File

@@ -0,0 +1,99 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -15,12 +15,11 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.lmnt import LmntTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
@@ -41,18 +40,17 @@ async def main():
token,
"Respond bot",
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
sample_rate=44100,
tts = LmntTTSService(
api_key=os.getenv("LMNT_API_KEY"),
voice_id="morgan"
)
llm = OpenAILLMService(
@@ -74,11 +72,11 @@ async def main():
tma_in, # User responses
llm, # LLM
tts, # TTS
tma_out, # Goes before the transport because cartesia has word-level timestamps!
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -104,7 +104,6 @@ async def main():
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="ErXwobaYiN019PkySvjV",
)

View File

@@ -111,7 +111,6 @@ async def main():
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
#
# English

View File

@@ -60,7 +60,6 @@ async def main(room_url, token=None):
)
tts_service = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)

View File

@@ -1,5 +1,5 @@
beautifulsoup4==4.12.2
PyPDF2==3.0.1
beautifulsoup4==4.12.3
pypdf==4.3.1
tiktoken==0.7.0
pipecat-ai[daily,cartesia,openai,silero]==0.0.39
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
python-dotenv==1.0.1

View File

@@ -50,12 +50,12 @@ async def configure_with_args(
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
)
aiohttp_session=aiohttp_session)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = daily_rest_helper.get_token(url, expiry_time)
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token, args)
return (url, token, args)

View File

@@ -5,7 +5,7 @@ import sys
import io
from bs4 import BeautifulSoup
from PyPDF2 import PdfReader
from pypdf import PdfReader
import tiktoken
from pipecat.frames.frames import LLMMessagesFrame
@@ -147,8 +147,8 @@ Your task is to help the user understand and learn from this article in 2 senten
tma_in,
llm,
tts,
tma_out,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))

View File

@@ -39,6 +39,7 @@ azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
cartesia = [ "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.5.0" ]
elevenlabs = [ "websockets~=12.0" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
@@ -46,6 +47,8 @@ google = [ "google-generativeai~=0.7.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.13.1" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
openai = [ "openai~=1.37.2" ]

View File

View File

@@ -0,0 +1,18 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
class BaseClock(ABC):
@abstractmethod
def get_time(self) -> int:
pass
@abstractmethod
def start(self):
pass

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from pipecat.clocks.base_clock import BaseClock
class SystemClock(BaseClock):
def __init__(self):
self._time = 0
def get_time(self) -> int:
return time.monotonic_ns() - self._time if self._time > 0 else 0
def start(self):
self._time = time.monotonic_ns()

View File

@@ -8,17 +8,27 @@ from typing import Any, List, Mapping, Optional, Tuple
from dataclasses import dataclass, field
from pipecat.clocks.base_clock import BaseClock
from pipecat.transcriptions.language import Language
from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
from pipecat.vad.vad_analyzer import VADParams
def format_pts(pts: int | None):
return nanoseconds_to_str(pts) if pts else None
@dataclass
class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
def __str__(self):
return self.name
@@ -44,7 +54,8 @@ class AudioRawFrame(DataFrame):
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
def __str__(self):
return f"{self.name}(size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
@@ -58,7 +69,8 @@ class ImageRawFrame(DataFrame):
format: str | None
def __str__(self):
return f"{self.name}(size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
@dataclass
@@ -70,7 +82,8 @@ class URLImageRawFrame(ImageRawFrame):
url: str | None
def __str__(self):
return f"{self.name}(url: {self.url}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
@dataclass
@@ -82,7 +95,8 @@ class VisionImageRawFrame(ImageRawFrame):
text: str | None
def __str__(self):
return f"{self.name}(text: {self.text}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
@dataclass
@@ -94,7 +108,8 @@ class UserImageRawFrame(ImageRawFrame):
user_id: str
def __str__(self):
return f"{self.name}(user: {self.user_id}, size: {self.size}, format: {self.format})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
@dataclass
@@ -107,7 +122,8 @@ class SpriteFrame(Frame):
images: List[ImageRawFrame]
def __str__(self):
return f"{self.name}(size: {len(self.images)})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.images)})"
@dataclass
@@ -119,7 +135,8 @@ class TextFrame(DataFrame):
text: str
def __str__(self):
return f"{self.name}(text: {self.text})"
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, text: {self.text})"
@dataclass
@@ -130,9 +147,10 @@ class TranscriptionFrame(TextFrame):
"""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
@dataclass
@@ -141,9 +159,10 @@ class InterimTranscriptionFrame(TextFrame):
the transport's receive queue when a participant speaks."""
user_id: str
timestamp: str
language: Language | None = None
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
@dataclass
@@ -322,6 +341,7 @@ class ControlFrame(Frame):
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
clock: BaseClock
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
@@ -432,6 +452,13 @@ class LLMModelUpdateFrame(ControlFrame):
model: str
@dataclass
class TTSModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the TTS model.
"""
model: str
@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice.
@@ -439,6 +466,31 @@ class TTSVoiceUpdateFrame(ControlFrame):
voice: str
@dataclass
class TTSLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS language and
optional voice.
"""
language: Language
@dataclass
class STTModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update the STT model and optional
language.
"""
model: str
@dataclass
class STTLanguageUpdateFrame(ControlFrame):
"""A control frame containing a request to update to STT language.
"""
language: Language
@dataclass
class FunctionCallInProgressFrame(SystemFrame):
"""A frame signaling that a function call is in progress.
@@ -455,7 +507,7 @@ class FunctionCallResultFrame(DataFrame):
function_name: str
tool_call_id: str
arguments: str
result: any
result: Any
@dataclass
@@ -463,4 +515,4 @@ class VADParamsUpdateFrame(ControlFrame):
"""A control frame containing a request to update VAD params. Intended
to be pushed upstream from RTVI processor.
"""
params: dict
params: VADParams

View File

@@ -10,6 +10,8 @@ from typing import AsyncIterable, Iterable
from pydantic import BaseModel
from pipecat.clocks.base_clock import BaseClock
from pipecat.clocks.system_clock import SystemClock
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@@ -60,11 +62,16 @@ class Source(FrameProcessor):
class PipelineTask:
def __init__(self, pipeline: BasePipeline, params: PipelineParams = PipelineParams()):
def __init__(
self,
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
clock: BaseClock = SystemClock()):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self._pipeline = pipeline
self._clock = clock
self._params = params
self._finished = False
@@ -116,11 +123,14 @@ class PipelineTask:
return MetricsFrame(ttfb=ttfb, processing=processing)
async def _process_down_queue(self):
self._clock.start()
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)

View File

@@ -109,7 +109,7 @@ class LLMResponseAggregator(FrameProcessor):
await self.push_frame(frame, direction)
elif isinstance(frame, self._accumulator_frame):
if self._aggregating:
self._aggregation += f" {frame.text}"
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
# We have recevied a complete sentence, so if we have seen the
# end frame and we were still aggregating, it means we should
# send the aggregation.

View File

@@ -9,6 +9,7 @@ import time
from enum import Enum
from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -96,6 +97,9 @@ class FrameProcessor:
self._next: "FrameProcessor" | None = None
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
# Clock
self._clock: BaseClock | None = None
# Properties
self._allow_interruptions = False
self._enable_metrics = False
@@ -177,8 +181,12 @@ class FrameProcessor:
def get_parent(self) -> "FrameProcessor":
return self._parent
def get_clock(self) -> BaseClock:
return self._clock
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._clock = frame.clock
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._enable_usage_metrics = frame.enable_usage_metrics

View File

@@ -81,11 +81,6 @@ class RTVIAction(BaseModel):
return super().model_post_init(__context)
#
# Client -> Pipecat messages.
#
class RTVIServiceOptionConfig(BaseModel):
name: str
value: Any
@@ -100,6 +95,16 @@ class RTVIConfig(BaseModel):
config: List[RTVIServiceConfig]
#
# Client -> Pipecat messages.
#
class RTVIUpdateConfig(BaseModel):
config: List[RTVIServiceConfig]
interrupt: bool = False
class RTVIActionRunArgument(BaseModel):
name: str
value: Any
@@ -222,7 +227,7 @@ class RTVILLMFunctionCallResultData(BaseModel):
function_name: str
tool_call_id: str
arguments: dict
result: dict
result: dict | str
class RTVITranscriptionMessageData(BaseModel):
@@ -351,8 +356,10 @@ class RTVIProcessor(FrameProcessor):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
await self._start(frame)
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self.push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.
@@ -489,8 +496,8 @@ class RTVIProcessor(FrameProcessor):
case "get-config":
await self._handle_get_config(message.id)
case "update-config":
config = RTVIConfig.model_validate(message.data)
await self._handle_update_config(message.id, config)
update_config = RTVIUpdateConfig.model_validate(message.data)
await self._handle_update_config(message.id, update_config)
case "action":
action = RTVIActionRun.model_validate(message.data)
await self._handle_action(message.id, action)
@@ -545,17 +552,14 @@ class RTVIProcessor(FrameProcessor):
await handler(self, service.name, option)
self._update_config_option(service.name, option)
async def _update_config(self, data: RTVIConfig):
async def _update_config(self, data: RTVIConfig, interrupt: bool):
if interrupt:
await self.interrupt_bot()
for service_config in data.config:
await self._update_service_config(service_config)
async def _handle_update_config(self, request_id: str, data: RTVIConfig):
# NOTE(aleix): The bot might be talking while we receive a new
# config. Let's interrupt it for now and update the config. Another
# solution is to wait until the bot stops speaking and then apply the
# config, but this definitely is more complicated to achieve.
await self.interrupt_bot()
await self._update_config(data)
async def _handle_update_config(self, request_id: str, data: RTVIUpdateConfig):
await self._update_config(RTVIConfig(config=data.config), data.interrupt)
await self._handle_get_config(request_id)
async def _handle_function_call_result(self, data):
@@ -583,7 +587,7 @@ class RTVIProcessor(FrameProcessor):
async def _maybe_send_bot_ready(self):
if self._pipeline_started and self._client_ready:
await self._send_bot_ready()
await self._update_config(self._config)
await self._update_config(self._config, False)
async def _send_bot_ready(self):
if not self._params.send_bot_ready:

View File

@@ -78,8 +78,10 @@ class GStreamerPipelineSource(FrameProcessor):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
await self._start(frame)
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self._internal_push_frame(frame, direction)
await self._start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import ctypes
import pickle
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.serializers.base_serializer import FrameSerializer
from loguru import logger
try:
from livekit.rtc import AudioFrame
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use LiveKit, you need to `pip install pipecat-ai[livekit]`.")
raise Exception(f"Missing module: {e}")
class LivekitFrameSerializer(FrameSerializer):
SERIALIZABLE_TYPES = {
AudioRawFrame: "audio",
}
def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
return None
audio_frame = AudioFrame(
data=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
samples_per_channel=len(frame.audio) // ctypes.sizeof(ctypes.c_int16),
)
return pickle.dumps(audio_frame)
def deserialize(self, data: str | bytes) -> Frame | None:
audio_frame: AudioFrame = pickle.loads(data)['frame']
return AudioRawFrame(
audio=bytes(audio_frame.data),
sample_rate=audio_frame.sample_rate,
num_channels=audio_frame.num_channels,
)

View File

@@ -9,7 +9,7 @@ import json
from pydantic import BaseModel
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw
@@ -28,22 +28,25 @@ class TwilioFrameSerializer(FrameSerializer):
self._params = params
def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
return None
if isinstance(frame, AudioRawFrame):
data = frame.audio
data = frame.audio
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._stream_sid,
"media": {
"payload": payload
serialized_data = pcm_to_ulaw(
data, frame.sample_rate, self._params.twilio_sample_rate)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._stream_sid,
"media": {
"payload": payload
}
}
}
return json.dumps(answer)
return json.dumps(answer)
if isinstance(frame, StartInterruptionFrame):
answer = {"event": "clear", "streamSid": self._stream_sid}
return json.dumps(answer)
def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)

View File

@@ -4,11 +4,12 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import wave
from abc import abstractmethod
from typing import AsyncGenerator
from typing import AsyncGenerator, List, Optional, Tuple
from pipecat.frames.frames import (
AudioRawFrame,
@@ -17,9 +18,15 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
STTLanguageUpdateFrame,
STTModelUpdateFrame,
StartFrame,
StartInterruptionFrame,
TTSLanguageUpdateFrame,
TTSModelUpdateFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSVoiceUpdateFrame,
TextFrame,
UserImageRequestFrame,
@@ -27,11 +34,15 @@ from pipecat.frames.frames import (
)
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.utils.audio import calculate_audio_volume
from pipecat.utils.string import match_endofsentence
from pipecat.utils.time import seconds_to_nanoseconds
from pipecat.utils.utils import exp_smoothing
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from loguru import logger
class AIService(FrameProcessor):
def __init__(self, **kwargs):
@@ -156,16 +167,32 @@ class TTSService(AIService):
aggregate_sentences: bool = True,
# if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames
push_text_frames: bool = True,
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
**kwargs):
super().__init__(**kwargs)
self._aggregate_sentences: bool = aggregate_sentences
self._push_text_frames: bool = push_text_frames
self._push_stop_frames: bool = push_stop_frames
self._stop_frame_timeout_s: float = stop_frame_timeout_s
self._stop_frame_task: Optional[asyncio.Task] = None
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
self._current_sentence: str = ""
@abstractmethod
async def set_model(self, model: str):
pass
@abstractmethod
async def set_voice(self, voice: str):
pass
@abstractmethod
async def set_language(self, language: Language):
pass
# Converts the text to audio.
@abstractmethod
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -222,15 +249,175 @@ class TTSService(AIService):
await self.push_frame(frame, direction)
elif isinstance(frame, TTSSpeakFrame):
await self._push_tts_frames(frame.text, False)
elif isinstance(frame, TTSModelUpdateFrame):
await self.set_model(frame.model)
elif isinstance(frame, TTSVoiceUpdateFrame):
await self.set_voice(frame.voice)
elif isinstance(frame, TTSLanguageUpdateFrame):
await self.set_language(frame.language)
else:
await self.push_frame(frame, direction)
async def start(self, frame: StartFrame):
await super().start(frame)
if self._push_stop_frames:
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
async def stop(self, frame: EndFrame):
await super().stop(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
if self._stop_frame_task:
self._stop_frame_task.cancel()
await self._stop_frame_task
self._stop_frame_task = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if self._push_stop_frames and (
isinstance(frame, StartInterruptionFrame) or
isinstance(frame, TTSStartedFrame) or
isinstance(frame, AudioRawFrame) or
isinstance(frame, TTSStoppedFrame)):
await self._stop_frame_queue.put(frame)
async def _stop_frame_handler(self):
try:
has_started = False
while True:
try:
frame = await asyncio.wait_for(self._stop_frame_queue.get(),
self._stop_frame_timeout_s)
if isinstance(frame, TTSStartedFrame):
has_started = True
elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
has_started = False
except asyncio.TimeoutError:
if has_started:
await self.push_frame(TTSStoppedFrame())
has_started = False
except asyncio.CancelledError:
pass
class AsyncTTSService(TTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@abstractmethod
async def flush_audio(self):
pass
class AsyncWordTTSService(AsyncTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._initial_word_timestamp = -1
self._words_queue = asyncio.Queue()
self._words_task = self.get_event_loop().create_task(self._words_task_handler())
def start_word_timestamps(self):
if self._initial_word_timestamp == -1:
self._initial_word_timestamp = self.get_clock().get_time()
def reset_word_timestamps(self):
self._initial_word_timestamp = -1
self._word_timestamps = []
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
for (word, timestamp) in word_times:
await self._words_queue.put((word, seconds_to_nanoseconds(timestamp)))
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._stop_words_task()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._stop_words_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
await self.flush_audio()
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
self.reset_word_timestamps()
async def _stop_words_task(self):
if self._words_task:
self._words_task.cancel()
await self._words_task
async def _words_task_handler(self):
while True:
try:
(word, timestamp) = await self._words_queue.get()
if word == "LLMFullResponseEndFrame" and timestamp == 0:
await self.push_frame(LLMFullResponseEndFrame())
else:
frame = TextFrame(word)
frame.pts = self._initial_word_timestamp + timestamp
await self.push_frame(frame)
self._words_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} exception: {e}")
class STTService(AIService):
"""STTService is a base class for speech-to-text services."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@abstractmethod
async def set_model(self, model: str):
pass
@abstractmethod
async def set_language(self, language: Language):
pass
@abstractmethod
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Returns transcript as a string"""
pass
async def process_audio_frame(self, frame: AudioRawFrame):
await self.process_generator(self.run_stt(frame.audio))
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it."""
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
# In this service we accumulate audio internally and at the end we
# push a TextFrame. We don't really want to push audio frames down.
await self.process_audio_frame(frame)
elif isinstance(frame, STTModelUpdateFrame):
await self.set_model(frame.model)
elif isinstance(frame, STTLanguageUpdateFrame):
await self.set_language(frame.language)
else:
await self.push_frame(frame, direction)
class SegmentedSTTService(STTService):
"""SegmentedSTTService is an STTService that will detect speech and will run
speech-to-text on speech segments only, instead of a continous stream.
"""
def __init__(self,
*,
min_volume: float = 0.6,
@@ -251,24 +438,7 @@ class STTService(AIService):
self._smoothing_factor = 0.2
self._prev_volume = 0
@abstractmethod
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Returns transcript as a string"""
pass
def _new_wave(self):
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(self._num_channels)
ww.setframerate(self._sample_rate)
return (content, ww)
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
volume = calculate_audio_volume(frame.audio, frame.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
async def _append_audio(self, frame: AudioRawFrame):
async def process_audio_frame(self, frame: AudioRawFrame):
# Try to filter out empty background noise
volume = self._get_smoothed_volume(frame)
if volume >= self._min_volume:
@@ -288,9 +458,7 @@ class STTService(AIService):
self._silence_num_frames = 0
self._wave.close()
self._content.seek(0)
await self.start_processing_metrics()
await self.process_generator(self.run_stt(self._content.read()))
await self.stop_processing_metrics()
(self._content, self._wave) = self._new_wave()
async def stop(self, frame: EndFrame):
@@ -299,16 +467,17 @@ class STTService(AIService):
async def cancel(self, frame: CancelFrame):
self._wave.close()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Processes a frame of audio data, either buffering or transcribing it."""
await super().process_frame(frame, direction)
def _new_wave(self):
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(self._num_channels)
ww.setframerate(self._sample_rate)
return (content, ww)
if isinstance(frame, AudioRawFrame):
# In this service we accumulate audio internally and at the end we
# push a TextFrame. We don't really want to push audio frames down.
await self._append_audio(frame)
else:
await self.push_frame(frame, direction)
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
volume = calculate_audio_volume(frame.audio, frame.sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
class ImageGenService(AIService):

View File

@@ -43,7 +43,7 @@ from pipecat.processors.aggregators.llm_response import (
from loguru import logger
try:
from anthropic import AsyncAnthropic
from anthropic import AsyncAnthropic, NOT_GIVEN, NotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -135,7 +135,7 @@ class AnthropicLLMService(LLMService):
response = await api_call(
tools=context.tools or [],
system=context.system or [],
system=context.system,
messages=messages,
model=self._model,
max_tokens=self._max_tokens,
@@ -171,7 +171,8 @@ class AnthropicLLMService(LLMService):
await self.call_function(context=context,
tool_call_id=tool_use_block.id,
function_name=tool_use_block.name,
arguments=json.loads(json_accumulator))
arguments=json.loads(json_accumulator) if json_accumulator else dict()
)
# Calculate usage. Do this here in its own if statement, because there may be usage
# data embedded in messages that we do other processing for, above.
@@ -269,7 +270,7 @@ class AnthropicLLMContext(OpenAILLMContext):
tools: list[dict] | None = None,
tool_choice: dict | None = None,
*,
system: List | None = None
system: str | NotGiven = NOT_GIVEN
):
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
self._user_image_request_context = {}

View File

@@ -12,7 +12,6 @@ import time
from typing import AsyncGenerator
from pipecat.processors.frame_processor import FrameDirection
from pipecat.frames.frames import (
CancelFrame,
ErrorFrame,
@@ -26,7 +25,9 @@ from pipecat.frames.frames import (
TextFrame,
LLMFullResponseEndFrame
)
from pipecat.services.ai_services import TTSService
from pipecat.processors.frame_processor import FrameDirection
from pipecat.transcriptions.language import Language
from pipecat.services.ai_services import AsyncWordTTSService
from loguru import logger
@@ -40,7 +41,26 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class CartesiaTTSService(TTSService):
def language_to_cartesia_language(language: Language) -> str | None:
match language:
case Language.DE:
return "de"
case Language.EN:
return "en"
case Language.ES:
return "es"
case Language.FR:
return "fr"
case Language.JA:
return "ja"
case Language.PT:
return "pt"
case Language.ZH:
return "zh"
return None
class CartesiaTTSService(AsyncWordTTSService):
def __init__(
self,
@@ -54,19 +74,17 @@ class CartesiaTTSService(TTSService):
sample_rate: int = 16000,
language: str = "en",
**kwargs):
super().__init__(**kwargs)
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for
# a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3
# model, and it's worth it for the better audio quality.
self._aggregate_sentences = True
# we don't want to automatically push LLM response text frames, because the
# context aggregators will add them to the LLM context even if we're
# interrupted. cartesia gives us word-by-word timestamps. we can use those
# to generate text frames ourselves aligned with the playout timing of the audio!
self._push_text_frames = False
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
super().__init__(aggregate_sentences=True, push_text_frames=False, **kwargs)
self._api_key = api_key
self._cartesia_version = cartesia_version
@@ -82,18 +100,23 @@ class CartesiaTTSService(TTSService):
self._websocket = None
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
self._receive_task = None
self._context_appending_task = None
def can_generate_metrics(self) -> bool:
return True
async def set_model(self, model: str):
logger.debug(f"Switching TTS model to: [{model}]")
self._model_id = model
async def set_voice(self, voice: str):
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice
async def set_language(self, language: Language):
logger.debug(f"Switching TTS language to: [{language}]")
self._language = language_to_cartesia_language(language)
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
@@ -112,7 +135,6 @@ class CartesiaTTSService(TTSService):
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler())
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
self._websocket = None
@@ -121,10 +143,6 @@ class CartesiaTTSService(TTSService):
try:
await self.stop_all_metrics()
if self._context_appending_task:
self._context_appending_task.cancel()
await self._context_appending_task
self._context_appending_task = None
if self._receive_task:
self._receive_task.cancel()
await self._receive_task
@@ -134,18 +152,33 @@ class CartesiaTTSService(TTSService):
self._websocket = None
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
except Exception as e:
logger.exception(f"{self} error closing websocket: {e}")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
self._context_id = None
self._context_id_start_timestamp = None
self._timestamped_words_buffer = []
await self.stop_all_metrics()
await self.push_frame(LLMFullResponseEndFrame())
self._context_id = None
async def flush_audio(self):
if not self._context_id or not self._websocket:
return
logger.debug("Flushing audio")
msg = {
"transcript": "",
"continue": False,
"context_id": self._context_id,
"model_id": self._model_id,
"voice": {
"mode": "id",
"id": self._voice_id
},
"output_format": self._output_format,
"language": self._language,
"add_timestamps": True,
}
await self._websocket.send(json.dumps(msg))
async def _receive_task_handler(self):
try:
@@ -160,16 +193,14 @@ class CartesiaTTSService(TTSService):
# because we are likely still playing out audio and need the
# timestamp to set send context frames.
self._context_id = None
self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0))
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
elif msg["type"] == "timestamps":
# logger.debug(f"TIMESTAMPS: {msg}")
self._timestamped_words_buffer.extend(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"]))
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
)
elif msg["type"] == "chunk":
await self.stop_ttfb_metrics()
if not self._context_id_start_timestamp:
self._context_id_start_timestamp = time.time()
self.start_word_timestamps()
frame = AudioRawFrame(
audio=base64.b64decode(msg["data"]),
sample_rate=self._output_format["sample_rate"],
@@ -188,27 +219,6 @@ class CartesiaTTSService(TTSService):
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _context_appending_task_handler(self):
try:
while True:
await asyncio.sleep(0.1)
if not self._context_id_start_timestamp:
continue
elapsed_seconds = time.time() - self._context_id_start_timestamp
# Pop all words from self._timestamped_words_buffer that are
# older than the elapsed time and print a message about them to
# the console.
while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds:
word, timestamp = self._timestamped_words_buffer.pop(0)
if word == "LLMFullResponseEndFrame" and timestamp == 0:
await self.push_frame(LLMFullResponseEndFrame())
continue
await self.push_frame(TextFrame(word))
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")

View File

@@ -16,12 +16,11 @@ from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
StartFrame,
SystemFrame,
TTSStartedFrame,
TTSStoppedFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncAIService, TTSService
from pipecat.services.ai_services import STTService, TTSService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from loguru import logger
@@ -30,10 +29,12 @@ from loguru import logger
# See .env.example for Deepgram configuration needed
try:
from deepgram import (
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
LiveResultResponse
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
@@ -107,7 +108,7 @@ class DeepgramTTSService(TTSService):
logger.exception(f"{self} exception: {e}")
class DeepgramSTTService(AsyncAIService):
class DeepgramSTTService(STTService):
def __init__(self,
*,
api_key: str,
@@ -120,6 +121,8 @@ class DeepgramSTTService(AsyncAIService):
channels=1,
interim_results=True,
smart_format=True,
punctuate=True,
profanity_filter=True,
),
**kwargs):
super().__init__(**kwargs)
@@ -128,40 +131,62 @@ class DeepgramSTTService(AsyncAIService):
self._client = DeepgramClient(
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
self._connection = self._client.listen.asynclive.v("1")
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
async def set_model(self, model: str):
logger.debug(f"Switching STT model to: [{model}]")
self._live_options.model = model
await self._disconnect()
await self._connect()
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
await self._connection.send(frame.audio)
else:
await self.queue_frame(frame, direction)
async def set_language(self, language: Language):
logger.debug(f"Switching STT language to: [{language}]")
self._live_options.language = language
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
await self.start_processing_metrics()
await self._connection.send(audio)
yield None
await self.stop_processing_metrics()
async def _connect(self):
if await self._connection.start(self._live_options):
logger.debug(f"{self}: Connected to Deepgram")
else:
logger.error(f"{self}: Unable to connect to Deepgram")
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._connection.finish()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._connection.finish()
async def _disconnect(self):
if self._connection.is_connected:
await self._connection.finish()
logger.debug(f"{self}: Disconnected from Deepgram")
async def _on_message(self, *args, **kwargs):
result = kwargs["result"]
result: LiveResultResponse = kwargs["result"]
if len(result.channel.alternatives) == 0:
return
is_final = result.is_final
transcript = result.channel.alternatives[0].transcript
language = None
if result.channel.alternatives[0].languages:
language = result.channel.alternatives[0].languages[0]
language = Language(language)
if len(transcript) > 0:
if is_final:
await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601()))
await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language))
else:
await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601()))
await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language))

View File

@@ -4,18 +4,72 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import base64
import json
from typing import AsyncGenerator, Literal
from typing import Any, AsyncGenerator, List, Literal, Mapping, Tuple
from pydantic import BaseModel
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame
from pipecat.services.ai_services import TTSService
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
StartFrame,
StartInterruptionFrame,
TTSStartedFrame,
TTSStoppedFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncWordTTSService
from loguru import logger
# See .env.example for ElevenLabs configuration needed
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`. Also, set `ELEVENLABS_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class ElevenLabsTTSService(TTSService):
def sample_rate_from_output_format(output_format: str) -> int:
match output_format:
case "pcm_16000":
return 16000
case "pcm_22050":
return 22050
case "pcm_24000":
return 24000
case "pcm_44100":
return 44100
return 16000
def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))
words = "".join(alignment_info["chars"]).split(" ")
# Calculate start time for each word. We do this by finding a space character
# and using the previous word time, also taking into account there might not
# be a space at the end.
times = []
for (i, (a, b)) in enumerate(zipped_times):
if a == " " or i == len(zipped_times) - 1:
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
times.append(t)
word_times = list(zip(words, times))
return word_times
class ElevenLabsTTSService(AsyncWordTTSService):
class InputParams(BaseModel):
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
@@ -24,56 +78,186 @@ class ElevenLabsTTSService(TTSService):
*,
api_key: str,
voice_id: str,
aiohttp_session: aiohttp.ClientSession,
model: str = "eleven_turbo_v2_5",
url: str = "wss://api.elevenlabs.io",
params: InputParams = InputParams(),
**kwargs):
super().__init__(**kwargs)
# Aggregating sentences still gives cleaner-sounding results and fewer
# artifacts than streaming one word at a time. On average, waiting for a
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
# 3 model, and it's worth it for the better audio quality.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
#
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
**kwargs
)
self._api_key = api_key
self._voice_id = voice_id
self._model = model
self._url = url
self._params = params
self._aiohttp_session = aiohttp_session
self._sample_rate = sample_rate_from_output_format(params.output_format)
# Websocket connection to ElevenLabs.
self._websocket = None
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
self._cumulative_time = 0
def can_generate_metrics(self) -> bool:
return True
async def set_model(self, model: str):
logger.debug(f"Switching TTS model to: [{model}]")
self._model = model
await self._disconnect()
await self._connect()
async def set_voice(self, voice: str):
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
async def flush_audio(self):
if self._websocket:
msg = {"text": " ", "flush": True}
await self._websocket.send(json.dumps(msg))
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
async def _connect(self):
try:
voice_id = self._voice_id
model = self._model
output_format = self._params.output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}"
self._websocket = await websockets.connect(url)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())
# According to ElevenLabs, we should always start with a single space.
msg = {
"text": " ",
"xi_api_key": self._api_key,
}
await self._websocket.send(json.dumps(msg))
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
self._websocket = None
async def _disconnect(self):
try:
await self.stop_all_metrics()
if self._receive_task:
self._receive_task.cancel()
await self._receive_task
self._receive_task = None
if self._keepalive_task:
self._keepalive_task.cancel()
await self._keepalive_task
self._keepalive_task = None
if self._websocket:
await self._websocket.close()
self._websocket = None
self._started = False
except Exception as e:
logger.exception(f"{self} error closing websocket: {e}")
async def _receive_task_handler(self):
try:
async for message in self._websocket:
msg = json.loads(message)
if msg.get("audio"):
await self.stop_ttfb_metrics()
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
frame = AudioRawFrame(audio, self._sample_rate, 1)
await self.push_frame(frame)
if msg.get("alignment"):
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
await self.add_word_timestamps(word_times)
self._cumulative_time = word_times[-1][1]
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _keepalive_task_handler(self):
while True:
try:
await asyncio.sleep(10)
await self._send_text("")
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def _send_text(self, text: str):
if self._websocket:
msg = {"text": text + " "}
await self._websocket.send(json.dumps(msg))
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
try:
if not self._websocket:
await self._connect()
payload = {"text": text, "model_id": self._model}
try:
if not self._started:
await self.push_frame(TTSStartedFrame())
await self.start_ttfb_metrics()
self._started = True
self._cumulative_time = 0
querystring = {
"output_format": self._params.output_format
}
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
await self.start_ttfb_metrics()
async with self._aiohttp_session.post(url, json=payload, headers=headers, params=querystring) as r:
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
await self.push_frame(TTSStoppedFrame())
await self._disconnect()
await self._connect()
return
await self.start_tts_usage_metrics(text)
await self.push_frame(TTSStartedFrame())
async for chunk in r.content:
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = AudioRawFrame(chunk, 16000, 1)
yield frame
await self.push_frame(TTSStoppedFrame())
yield None
except Exception as e:
logger.exception(f"{self} exception: {e}")

View File

@@ -0,0 +1,166 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import AsyncGenerator
from pipecat.processors.frame_processor import FrameDirection
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
StartInterruptionFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.ai_services import AsyncTTSService
from loguru import logger
# See .env.example for LMNT configuration needed
try:
from lmnt.api import Speech
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`. Also, set `LMNT_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class LmntTTSService(AsyncTTSService):
def __init__(
self,
*,
api_key: str,
voice_id: str,
sample_rate: int = 24000,
language: str = "en",
**kwargs):
# Let TTSService produce TTSStoppedFrames after a short delay of
# no activity.
super().__init__(push_stop_frames=True, **kwargs)
self._api_key = api_key
self._voice_id = voice_id
self._output_format = {
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": sample_rate,
}
self._language = language
self._speech = None
self._connection = None
self._receive_task = None
# Indicates if we have sent TTSStartedFrame. It will reset to False when
# there's an interruption or TTSStoppedFrame.
self._started = False
def can_generate_metrics(self) -> bool:
return True
async def set_voice(self, voice: str):
logger.debug(f"Switching TTS voice to: [{voice}]")
self._voice_id = voice
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
await super().push_frame(frame, direction)
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
self._started = False
async def _connect(self):
try:
self._speech = Speech()
self._connection = await self._speech.synthesize_streaming(
self._voice_id, format="raw", sample_rate=self._output_format["sample_rate"])
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
self._connection = None
async def _disconnect(self):
try:
await self.stop_all_metrics()
if self._receive_task:
self._receive_task.cancel()
await self._receive_task
self._receive_task = None
if self._connection:
await self._connection.socket.close()
self._connection = None
if self._speech:
await self._speech.close()
self._speech = None
self._started = False
except Exception as e:
logger.exception(f"{self} error closing websocket: {e}")
async def _receive_task_handler(self):
try:
async for msg in self._connection:
if "error" in msg:
logger.error(f'{self} error: {msg["error"]}')
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}'))
elif "audio" in msg:
await self.stop_ttfb_metrics()
frame = AudioRawFrame(
audio=msg["audio"],
sample_rate=self._output_format["sample_rate"],
num_channels=1
)
await self.push_frame(frame)
else:
logger.error(f"LMNT error, unknown message type: {msg}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(f"{self} exception: {e}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._connection:
await self._connect()
if not self._started:
await self.push_frame(TTSStartedFrame())
await self.start_ttfb_metrics()
self._started = True
try:
await self._connection.append_text(text)
await self._connection.flush()
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} error sending message: {e}")
await self.push_frame(TTSStoppedFrame())
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.exception(f"{self} exception: {e}")

View File

@@ -173,7 +173,7 @@ class TogetherLLMService(LLMService):
try:
arguments = json.loads(args_string)
await self.call_function(context=context,
tool_call_id=uuid.uuid4(),
tool_call_id=str(uuid.uuid4()),
function_name=function_name,
arguments=arguments)
return
@@ -301,7 +301,8 @@ class TogetherAssistantContextAggregator(LLMAssistantContextAggregator):
self._function_call_result = None
self._context.add_message({
"role": "tool",
"content": frame.result
# Together expects the content here to be a string, so stringify it
"content": str(frame.result)
})
run_llm = True
else:

View File

@@ -14,7 +14,7 @@ from typing import AsyncGenerator
import numpy as np
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
from pipecat.services.ai_services import STTService
from pipecat.services.ai_services import SegmentedSTTService
from pipecat.utils.time import time_now_iso8601
from loguru import logger
@@ -38,7 +38,7 @@ class Model(Enum):
DISTIL_MEDIUM_EN = "Systran/faster-distil-whisper-medium.en"
class WhisperSTTService(STTService):
class WhisperSTTService(SegmentedSTTService):
"""Class to transcribe audio with a locally-downloaded Whisper model"""
def __init__(self,
@@ -77,6 +77,7 @@ class WhisperSTTService(STTService):
yield ErrorFrame("Whisper model not available")
return
await self.start_processing_metrics()
await self.start_ttfb_metrics()
# Divide by 32768 because we have signed 16-bit data.
@@ -88,7 +89,9 @@ class WhisperSTTService(STTService):
if segment.no_speech_prob < self._no_speech_prob:
text += f"{segment.text} "
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
if text:
await self.stop_ttfb_metrics()
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(text, "", time_now_iso8601())

View File

View File

@@ -0,0 +1,64 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import sys
from enum import Enum
if sys.version_info < (3, 11):
class StrEnum(str, Enum):
def __new__(cls, value):
obj = str.__new__(cls, value)
obj._value_ = value
return obj
else:
from enum import StrEnum
class Language(StrEnum):
BG = "bg" # Bulgarian
CA = "ca" # Catalan
ZH = "zh" # Chinese simplified
ZH_TW = "zh-TW" # Chinese traditional
CS = "cs" # Czech
DA = "da" # Danish
NL = "nl" # Dutch
EN = "en" # English
EN_US = "en-US" # English (USA)
EN_AU = "en-AU" # English (Australia)
EN_GB = "en-GB" # English (Great Britain)
EN_NZ = "en-NZ" # English (New Zealand)
EN_IN = "en-IN" # English (India)
ET = "et" # Estonian
FI = "fi" # Finnish
NL_BE = "nl-BE" # Flemmish
FR = "fr" # French
FR_CA = "fr-CA" # French (Canada)
DE = "de" # German
DE_CH = "de-CH" # German (Switzerland)
EL = "el" # Greek
HI = "hi" # Hindi
HU = "hu" # Hungarian
ID = "id" # Indonesian
IT = "it" # Italian
JA = "ja" # Japanese
KO = "ko" # Korean
LV = "lv" # Latvian
LT = "lt" # Lithuanian
MS = "ms" # Malay
NO = "no" # Norwegian
PL = "pl" # Polish
PT = "pt" # Portuguese
PT_BR = "pt-BR" # Portuguese (Brazil)
RO = "ro" # Romanian
RU = "ru" # Russian
SK = "sk" # Slovak
ES = "es" # Spanish
SV = "sv" # Swedish
TH = "th" # Thai
TR = "tr" # Turkish
UK = "uk" # Ukrainian
VI = "vi" # Vietnamese

View File

@@ -23,7 +23,7 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
VADParamsUpdateFrame)
from pipecat.transports.base_transport import TransportParams
from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams, VADState
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from loguru import logger
@@ -96,8 +96,10 @@ class BaseInputTransport(FrameProcessor):
await self.push_frame(frame, direction)
# Control frames
elif isinstance(frame, StartFrame):
await self.start(frame)
# Push StartFrame before start(), because we want StartFrame to be
# processed by every processor before any other frame is processed.
await self._internal_push_frame(frame, direction)
await self.start(frame)
elif isinstance(frame, EndFrame):
# Push EndFrame before stop(), because stop() waits on the task to
# finish and the task finishes when EndFrame is processed.
@@ -105,9 +107,8 @@ class BaseInputTransport(FrameProcessor):
await self.stop(frame)
elif isinstance(frame, VADParamsUpdateFrame):
vad_analyzer = self.vad_analyzer()
if not vad_analyzer:
pass
vad_analyzer.set_params(frame.params)
if vad_analyzer:
vad_analyzer.set_params(frame.params)
# Other frames
else:
await self._internal_push_frame(frame, direction)

View File

@@ -8,6 +8,7 @@
import asyncio
import itertools
import time
import sys
from PIL import Image
from typing import List
@@ -30,11 +31,14 @@ from pipecat.frames.frames import (
SystemFrame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
TransportMessageFrame)
from pipecat.transports.base_transport import TransportParams
from loguru import logger
from pipecat.utils.time import nanoseconds_to_seconds
class BaseOutputTransport(FrameProcessor):
@@ -64,7 +68,7 @@ class BaseOutputTransport(FrameProcessor):
# Create sink frame task. This is the task that will actually write
# audio or video frames. We write audio/video in a task so we can keep
# generating frames upstream while, for example, the audio is playing.
self._create_sink_task()
self._create_sink_tasks()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
@@ -134,8 +138,8 @@ class BaseOutputTransport(FrameProcessor):
# queue.
#
if isinstance(frame, CancelFrame):
await self.push_frame(frame, direction)
await self.cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
@@ -149,6 +153,7 @@ class BaseOutputTransport(FrameProcessor):
await self._sink_queue.put(frame)
await self.start(frame)
elif isinstance(frame, EndFrame):
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)
await self.stop(frame)
# Other frames.
@@ -158,6 +163,9 @@ class BaseOutputTransport(FrameProcessor):
await self._handle_image(frame)
elif isinstance(frame, TransportMessageFrame) and frame.urgent:
await self.send_message(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
else:
await self._sink_queue.put(frame)
@@ -166,10 +174,14 @@ class BaseOutputTransport(FrameProcessor):
return
if isinstance(frame, StartInterruptionFrame):
# Stop sink task.
# Stop sink tasks.
self._sink_task.cancel()
await self._sink_task
self._create_sink_task()
# Stop sink clock tasks.
self._sink_clock_task.cancel()
await self._sink_clock_task
# Create sink tasks.
self._create_sink_tasks()
# Stop push task.
self._push_frame_task.cancel()
await self._push_frame_task
@@ -201,48 +213,90 @@ class BaseOutputTransport(FrameProcessor):
else:
await self._sink_queue.put(frame)
def _create_sink_task(self):
#
# Sink tasks
#
def _create_sink_tasks(self):
loop = self.get_event_loop()
self._sink_queue = asyncio.Queue()
self._sink_task = loop.create_task(self._sink_task_handler())
self._sink_clock_queue = asyncio.PriorityQueue()
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, AudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
await self._internal_push_frame(frame)
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
elif isinstance(frame, ImageRawFrame):
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
elif isinstance(frame, TTSStartedFrame):
await self._bot_started_speaking()
await self._internal_push_frame(frame)
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self._internal_push_frame(frame)
else:
await self._internal_push_frame(frame)
async def _sink_task_handler(self):
running = True
while running:
try:
frame = await self._sink_queue.get()
if isinstance(frame, AudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
await self._internal_push_frame(frame)
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
elif isinstance(frame, ImageRawFrame):
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
elif isinstance(frame, TTSStartedFrame):
await self._bot_started_speaking()
await self._internal_push_frame(frame)
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self._internal_push_frame(frame)
else:
await self._internal_push_frame(frame)
await self._sink_frame_handler(frame)
running = not isinstance(frame, EndFrame)
self._sink_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error processing sink queue: {e}")
async def _sink_clock_frame_handler(self, frame: Frame):
# TODO(aleix): For now we just process TextFrame. But we should process
# audio and video as well.
if isinstance(frame, TextFrame):
await self._internal_push_frame(frame)
async def _sink_clock_task_handler(self):
running = True
while running:
try:
timestamp, _, frame = await self._sink_clock_queue.get()
# If we hit an EndFrame, we cna finish right away.
running = not isinstance(frame, EndFrame)
# If we have a frame we check it's presentation timestamp. If it
# has already passed we process it, otherwise we wait until it's
# time to process it.
if running:
current_time = self.get_clock().get_time()
if timestamp <= current_time:
await self._sink_clock_frame_handler(frame)
else:
wait_time = nanoseconds_to_seconds(timestamp - current_time)
await asyncio.sleep(wait_time)
await self._sink_frame_handler(frame)
self._sink_clock_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.exception(f"{self} error processing sink clock queue: {e}")
async def _bot_started_speaking(self):
logger.debug("Bot started speaking")
self._bot_speaking = True
await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
async def _bot_stopped_speaking(self):
logger.debug("Bot stopped speaking")
self._bot_speaking = False
await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)

View File

@@ -32,6 +32,7 @@ class TransportParams(BaseModel):
audio_out_is_live: bool = False
audio_out_sample_rate: int = 16000
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_in_enabled: bool = False
audio_in_sample_rate: int = 16000
audio_in_channels: int = 1

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import json
import io
import wave
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
try:
from fastapi import Request, Response
from starlette.background import BackgroundTask
from sse_starlette.sse import EventSourceResponse
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use FastAPI HTTP SSE, you need to `pip install pipecat-ai[http]`.")
raise Exception(f"Missing module: {e}")
class FastAPIHTTPParams(TransportParams):
serializer: FrameSerializer
class FastAPIHTTPInputTransport(BaseInputTransport):
def __init__(
self,
params: FastAPIHTTPParams,
**kwargs):
super().__init__(params, **kwargs)
self._params = params
self._request = None
# todo: this should probably expect a list of frames, not just one frame
async def handle_request(self, request: Request):
self._request = request
frames_list = await request.json()
logger.debug(f"Received frames: {frames_list}")
for frame in frames_list:
logger.debug(f"Received frame: {frame}")
frame = self._params.serializer.deserialize(frame)
if frame and isinstance(frame, AudioRawFrame):
await self.push_audio_frame(frame)
else:
await self.push_frame(frame)
class FastAPIHTTPOutputTransport(BaseOutputTransport):
def __init__(self, params: FastAPIHTTPParams, **kwargs):
super().__init__(params, **kwargs)
self._params = params
self._event_queue = asyncio.Queue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
pass
async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)
await self._event_queue.put(payload)
async def event_generator(self):
while True:
event = await self._event_queue.get()
logger.debug(f"Sending event {event}")
yield event
class FastAPIHTTPTransport(BaseTransport):
def __init__(
self,
params: FastAPIHTTPParams,
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
self._params = params
self._request = None
self._input = FastAPIHTTPInputTransport(
self._params, name=self._input_name)
self._output = FastAPIHTTPOutputTransport(
self._params, name=self._output_name)
def input(self) -> FrameProcessor:
return self._input
def output(self) -> FrameProcessor:
return self._output
async def handle_request(self, request: Request):
self._request = request
await self._input.handle_request(request)
return EventSourceResponse(self._output.event_generator())

View File

@@ -12,8 +12,8 @@ import wave
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
@@ -91,13 +91,20 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
self._websocket = websocket
self._params = params
self._audio_buffer = bytes()
self._websocket_audio_buffer = bytes()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
self._websocket_audio_buffer += frames
while len(self._websocket_audio_buffer):
frame = AudioRawFrame(
audio=self._audio_buffer[:self._params.audio_frame_size],
audio=self._websocket_audio_buffer[:
self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels
)
@@ -121,7 +128,13 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
self._websocket_audio_buffer = self._websocket_audio_buffer[
self._params.audio_frame_size:]
async def _write_frame(self, frame: Frame):
payload = self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)
class FastAPIWebsocketTransport(BaseTransport):

View File

@@ -36,6 +36,7 @@ from pipecat.frames.frames import (
UserImageRawFrame,
UserImageRequestFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transcriptions.language import Language
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
@@ -305,7 +306,7 @@ class DailyTransportClient(EventHandler):
if self._token and self._params.transcription_enabled:
await self._start_transcription()
await self._callbacks.on_joined(data["participants"]["local"])
await self._callbacks.on_joined(data)
else:
error_msg = f"Error joining {self._room_url}: {error}"
logger.error(error_msg)
@@ -365,6 +366,12 @@ class DailyTransportClient(EventHandler):
}
},
}
},
"microphone": {
"sendSettings": {
"channelConfig": "stereo" if self._params.audio_out_channels == 2 else "mono",
"bitrate": self._params.audio_out_bitrate,
}
}
},
})
@@ -864,8 +871,8 @@ class DailyTransport(BaseTransport):
self._input.capture_participant_video(
participant_id, framerate, video_source, color_format)
async def _on_joined(self, participant):
await self._call_event_handler("on_joined", participant)
async def _on_joined(self, data):
await self._call_event_handler("on_joined", data)
async def _on_left(self):
await self._call_event_handler("on_left")
@@ -950,11 +957,16 @@ class DailyTransport(BaseTransport):
text = message["text"]
timestamp = message["timestamp"]
is_final = message["rawResponse"]["is_final"]
try:
language = message["rawResponse"]["channel"]["alternatives"][0]["languages"][0]
language = Language(language)
except KeyError:
language = None
if is_final:
frame = TranscriptionFrame(text, participant_id, timestamp)
frame = TranscriptionFrame(text, participant_id, timestamp, language)
logger.debug(f"Transcription (from: {participant_id}): [{text}]")
else:
frame = InterimTranscriptionFrame(text, participant_id, timestamp)
frame = InterimTranscriptionFrame(text, participant_id, timestamp, language)
if self._input:
await self._input.push_transcription_frame(frame)

View File

@@ -9,3 +9,20 @@ import datetime
def time_now_iso8601() -> str:
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds")
def seconds_to_nanoseconds(seconds: float) -> int:
return int(seconds * 1_000_000_000)
def nanoseconds_to_seconds(nanoseconds: int) -> float:
return nanoseconds / 1_000_000_000
def nanoseconds_to_str(nanoseconds: int) -> str:
total_seconds = nanoseconds_to_seconds(nanoseconds)
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
microseconds = int((total_seconds - int(total_seconds)) * 1_000_000)
return f"{hours}:{minutes:02}:{seconds:02}.{microseconds:06}"

View File

@@ -3,32 +3,39 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import collections
import itertools
from threading import Lock
_COUNTS = {}
_COUNTS_MUTEX = Lock()
_ID = 0
_ID_MUTEX = Lock()
_COUNTS = collections.defaultdict(itertools.count)
_ID = itertools.count()
def obj_id() -> int:
global _ID, _ID_MUTEX
with _ID_MUTEX:
_ID += 1
return _ID
"""
Generate a unique id for an object.
>>> obj_id()
0
>>> obj_id()
1
>>> obj_id()
2
"""
return next(_ID)
def obj_count(obj) -> int:
global _COUNTS, COUNTS_MUTEX
name = obj.__class__.__name__
with _COUNTS_MUTEX:
if name not in _COUNTS:
_COUNTS[name] = 0
else:
_COUNTS[name] += 1
return _COUNTS[name]
"""Generate a unique id for an object.
>>> obj_count(object())
0
>>> obj_count(object())
1
>>> new_type = type('NewType', (object,), {})
>>> obj_count(new_type())
0
"""
return next(_COUNTS[obj.__class__.__name__])
def exp_smoothing(value: float, prev_value: float, factor: float) -> float: