Compare commits
76 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81c2c5adfa | ||
|
|
d9d6571c73 | ||
|
|
540cad4844 | ||
|
|
0a26b650c0 | ||
|
|
adaac003e5 | ||
|
|
3d4f125071 | ||
|
|
bce87f8717 | ||
|
|
1fe940bd6b | ||
|
|
cb36a71381 | ||
|
|
5acc4928fe | ||
|
|
434493b8aa | ||
|
|
f08b25dbb2 | ||
|
|
3665734972 | ||
|
|
a98d78cdea | ||
|
|
80f6d74e80 | ||
|
|
02d926e9bd | ||
|
|
7749692f72 | ||
|
|
7807cbeb39 | ||
|
|
72f231b327 | ||
|
|
3cbe97d346 | ||
|
|
b880e1a60e | ||
|
|
886046e696 | ||
|
|
9106a5f8ae | ||
|
|
98286336bf | ||
|
|
081b001c8b | ||
|
|
c92531a02f | ||
|
|
748a7af602 | ||
|
|
f4a0de6327 | ||
|
|
e405d7af9f | ||
|
|
51cd7fd285 | ||
|
|
aba5f89174 | ||
|
|
5c0f5a1613 | ||
|
|
7c342f7ba2 | ||
|
|
37e2388758 | ||
|
|
05f0492a8d | ||
|
|
c0ac5c6ae8 | ||
|
|
be923687fb | ||
|
|
5f32fb125d | ||
|
|
ae6fbb3146 | ||
|
|
864768635a | ||
|
|
d7c9679977 | ||
|
|
fedfc366f6 | ||
|
|
b3b39626e1 | ||
|
|
4e0ece17b6 | ||
|
|
fd3fdacdee | ||
|
|
a253606d50 | ||
|
|
568d9dc0a3 | ||
|
|
6629b853c5 | ||
|
|
3931cb3235 | ||
|
|
38cd86ad52 | ||
|
|
c0cdabf61d | ||
|
|
51270a96c5 | ||
|
|
84d72c0d5c | ||
|
|
79aca8169a | ||
|
|
b9d362bd62 | ||
|
|
87c4a1bee1 | ||
|
|
c979762b70 | ||
|
|
1d92fc3199 | ||
|
|
8ac7fb1a67 | ||
|
|
60c3d33def | ||
|
|
8a39d3f4eb | ||
|
|
e038767b6f | ||
|
|
0c46b3e481 | ||
|
|
d42f072ff5 | ||
|
|
9b6f29c24a | ||
|
|
873d5dc23f | ||
|
|
6d141fd47f | ||
|
|
c6f6cb2947 | ||
|
|
0eb189ce7f | ||
|
|
f4fd7b7028 | ||
|
|
21de8e0a35 | ||
|
|
6f55d494bd | ||
|
|
d216edc567 | ||
|
|
ec6063ecc4 | ||
|
|
40fe4ce6fb | ||
|
|
9bda09b1a8 |
80
CHANGELOG.md
80
CHANGELOG.md
@@ -5,6 +5,86 @@ All notable changes to **pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- A clock can now be specified to `PipelineTask` (defaults to
|
||||
`SystemClock`). This clock will be passed to each frame processor via the
|
||||
`StartFrame`.
|
||||
|
||||
- Added pipeline clocks. A pipeline clock is used by the output transport to
|
||||
know when a frame needs to be presented. For that, all frames now have an
|
||||
optional `pts` field (prensentation timestamp). There's currently just one
|
||||
clock implementation `SystemClock` and the `pts` field is currently only used
|
||||
for `TextFrame`s (audio and image frames will be next).
|
||||
|
||||
- `DailyTransport` now supports setting the audio bitrate to improve audio
|
||||
quality through the `DailyParams.audio_out_bitrate` parameter. The new
|
||||
default is 96kbps.
|
||||
|
||||
- `DailyTransport` now uses the number of audio output channels (1 or 2) to set
|
||||
mono or stereo audio when needed.
|
||||
|
||||
- Interruptions support has been added to `TwilioFrameSerializer` when using
|
||||
`FastAPIWebsocketTransport`.
|
||||
|
||||
- Added new `LmntTTSService` text-to-speech service.
|
||||
(see https://www.lmnt.com/)
|
||||
|
||||
- Added `TTSModelUpdateFrame`, `TTSLanguageUpdateFrame`, `STTModelUpdateFrame`,
|
||||
and `STTLanguageUpdateFrame` frames to allow you to switch models, language
|
||||
and voices in TTS and STT services.
|
||||
|
||||
- Added new `transcriptions.Language` enum.
|
||||
|
||||
### Changed
|
||||
|
||||
- `CartesiaTTSService` and `ElevenLabsTTSService` now add presentation
|
||||
timestamps to their text output. This allows the output transport to push the
|
||||
text frames downstream at almost the same time the words are spoken. We say
|
||||
"almost" because currently the audio frames don't have presentation timestamp
|
||||
but they should be played at roughly the same time.
|
||||
|
||||
- `DailyTransport.on_joined` event now returns the full session data instead of
|
||||
just the participant.
|
||||
|
||||
- `CartesiaTTSService` is now a subclass of `TTSService`.
|
||||
|
||||
- `DeepgramSTTService` is now a subclass of `STTService`.
|
||||
|
||||
- `WhisperSTTService` is now a subclass of `SegmentedSTTService`. A
|
||||
`SegmentedSTTService` is a `STTService` where the provided audio is given in a
|
||||
big chunk (i.e. from when the user starts speaking until the user stops
|
||||
speaking) instead of a continous stream.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `StartFrame` should be the first frame every processor receives to avoid
|
||||
situations where things are not initialized (because initialization happens on
|
||||
`StartFrame`) and other frames come in resulting in undesired behavior.
|
||||
|
||||
### Performance
|
||||
|
||||
- `obj_id()` and `obj_count()` now use `itertools.count` avoiding the need of
|
||||
`threading.Lock`.
|
||||
|
||||
## [0.0.41] - 2024-08-22
|
||||
|
||||
### Added
|
||||
|
||||
- Added `LivekitFrameSerializer` audio frame serializer.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix `FastAPIWebsocketOutputTransport` variable name clash with subclass.
|
||||
|
||||
- Fix an `AnthropicLLMService` issue with empty arguments in function calling.
|
||||
|
||||
### Other
|
||||
|
||||
- Fixed `studypal` example errors.
|
||||
|
||||
## [0.0.40] - 2024-08-20
|
||||
|
||||
### Added
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
|
||||
# Pipecat
|
||||
|
||||
[](https://pypi.org/project/pipecat-ai) [](https://discord.gg/pipecat)
|
||||
[](https://pypi.org/project/pipecat-ai) [](https://discord.gg/pipecat) <a href="https://app.commanddash.io/agent/github_pipecat-ai_pipecat"><img src="https://img.shields.io/badge/AI-Code%20Agent-EB9FDA"></a>
|
||||
|
||||
`pipecat` is a framework for building voice (and multimodal) conversational agents. Things like personal coaches, meeting assistants, [story-telling toys for kids](https://storytelling-chatbot.fly.dev/), customer support bots, [intake flows](https://www.youtube.com/watch?v=lDevgsp9vn0), and snarky social companions.
|
||||
|
||||
@@ -39,7 +38,7 @@ pip install "pipecat-ai[option,...]"
|
||||
|
||||
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
|
||||
|
||||
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
|
||||
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
|
||||
- **Transports**: `local`, `websocket`, `daily`
|
||||
|
||||
## Code examples
|
||||
|
||||
@@ -30,6 +30,10 @@ FIREWORKS_API_KEY=...
|
||||
# Gladia
|
||||
GLADIA_API_KEY=...
|
||||
|
||||
# LMNT
|
||||
LMNT_API_KEY=...
|
||||
LMNT_VOICE_ID=...
|
||||
|
||||
# PlayHT
|
||||
PLAY_HT_USER_ID=...
|
||||
PLAY_HT_API_KEY=...
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
@@ -27,71 +26,69 @@ daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
async def main(room_url: str, token: str):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
|
||||
},
|
||||
]
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
if state == "left":
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
if state == "left":
|
||||
await task.queue_frame(EndFrame())
|
||||
runner = PipelineRunner()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
@@ -29,75 +28,74 @@ daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# diallin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
diallin_settings = DailyDialinSettings(
|
||||
call_id=callId,
|
||||
call_domain=callDomain
|
||||
# diallin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
diallin_settings = DailyDialinSettings(
|
||||
call_id=callId,
|
||||
call_domain=callDomain
|
||||
)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=diallin_settings,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=diallin_settings,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
)
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o"
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
|
||||
},
|
||||
]
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
|
||||
},
|
||||
]
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
runner = PipelineRunner()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
@@ -36,82 +35,81 @@ daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
|
||||
|
||||
async def main(room_url: str, token: str, callId: str, sipUri: str):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# diallin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=None, # Not required for Twilio
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
# dialin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=None, # Not required for Twilio
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o"
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_dialin_ready")
|
||||
async def on_dialin_ready(transport, cdata):
|
||||
# For Twilio, Telnyx, etc. You need to update the state of the call
|
||||
# and forward it to the sip_uri..
|
||||
print(f"Forwarding call: {callId} {sipUri}")
|
||||
|
||||
try:
|
||||
# The TwiML is updated using Twilio's client library
|
||||
call = twilioclient.calls(callId).update(
|
||||
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to forward call: {str(e)}")
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Hello! Who dares dial me at this hour?!'.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_dialin_ready")
|
||||
async def on_dialin_ready(transport, cdata):
|
||||
# For Twilio, Telnyx, etc. You need to update the state of the call
|
||||
# and forward it to the sip_uri..
|
||||
print(f"Forwarding call: {callId} {sipUri}")
|
||||
|
||||
try:
|
||||
# The TwiML is updated using Twilio's client library
|
||||
call = twilioclient.calls(callId).update(
|
||||
twiml=f'<Response><Dial><Sip>{sipUri}</Sip></Dial></Response>'
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to forward call: {str(e)}")
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -3,3 +3,4 @@ fastapi
|
||||
uvicorn
|
||||
python-dotenv
|
||||
twilio
|
||||
python-multipart
|
||||
|
||||
@@ -89,7 +89,6 @@ async def main():
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
@@ -85,7 +85,6 @@ async def main():
|
||||
model="gpt-4o")
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
|
||||
|
||||
|
||||
@@ -79,7 +79,6 @@ async def main():
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
@@ -18,7 +18,6 @@ from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
|
||||
99
examples/foundational/07d-interruptible-elevenlabs.py
Normal file
99
examples/foundational/07d-interruptible-elevenlabs.py
Normal file
@@ -0,0 +1,99 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer()
|
||||
)
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -15,12 +15,11 @@ from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.lmnt import LmntTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
@@ -41,18 +40,17 @@ async def main():
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
audio_out_sample_rate=24000,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer()
|
||||
)
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
|
||||
sample_rate=44100,
|
||||
tts = LmntTTSService(
|
||||
api_key=os.getenv("LMNT_API_KEY"),
|
||||
voice_id="morgan"
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
@@ -74,11 +72,11 @@ async def main():
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tma_out, # Goes before the transport because cartesia has word-level timestamps!
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
@@ -104,7 +104,6 @@ async def main():
|
||||
model="gpt-4o")
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id="ErXwobaYiN019PkySvjV",
|
||||
)
|
||||
|
||||
@@ -111,7 +111,6 @@ async def main():
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
#
|
||||
# English
|
||||
|
||||
@@ -60,7 +60,6 @@ async def main(room_url, token=None):
|
||||
)
|
||||
|
||||
tts_service = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
beautifulsoup4==4.12.2
|
||||
PyPDF2==3.0.1
|
||||
beautifulsoup4==4.12.3
|
||||
pypdf==4.3.1
|
||||
tiktoken==0.7.0
|
||||
pipecat-ai[daily,cartesia,openai,silero]==0.0.39
|
||||
pipecat-ai[daily,cartesia,openai,silero]==0.0.40
|
||||
python-dotenv==1.0.1
|
||||
|
||||
@@ -50,12 +50,12 @@ async def configure_with_args(
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
)
|
||||
aiohttp_session=aiohttp_session)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = daily_rest_helper.get_token(url, expiry_time)
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token, args)
|
||||
return (url, token, args)
|
||||
|
||||
@@ -5,7 +5,7 @@ import sys
|
||||
import io
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from PyPDF2 import PdfReader
|
||||
from pypdf import PdfReader
|
||||
import tiktoken
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
@@ -147,8 +147,8 @@ Your task is to help the user understand and learn from this article in 2 senten
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
tma_out,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@@ -39,6 +39,7 @@ azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
|
||||
cartesia = [ "websockets~=12.0" ]
|
||||
daily = [ "daily-python~=0.10.1" ]
|
||||
deepgram = [ "deepgram-sdk~=3.5.0" ]
|
||||
elevenlabs = [ "websockets~=12.0" ]
|
||||
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=12.0" ]
|
||||
@@ -46,6 +47,8 @@ google = [ "google-generativeai~=0.7.2" ]
|
||||
gstreamer = [ "pygobject~=3.48.2" ]
|
||||
fireworks = [ "openai~=1.37.2" ]
|
||||
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
|
||||
livekit = [ "livekit~=0.13.1" ]
|
||||
lmnt = [ "lmnt~=1.1.4" ]
|
||||
local = [ "pyaudio~=0.2.14" ]
|
||||
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
|
||||
openai = [ "openai~=1.37.2" ]
|
||||
|
||||
0
src/pipecat/clocks/__init__.py
Normal file
0
src/pipecat/clocks/__init__.py
Normal file
18
src/pipecat/clocks/base_clock.py
Normal file
18
src/pipecat/clocks/base_clock.py
Normal file
@@ -0,0 +1,18 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseClock(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def get_time(self) -> int:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self):
|
||||
pass
|
||||
21
src/pipecat/clocks/system_clock.py
Normal file
21
src/pipecat/clocks/system_clock.py
Normal file
@@ -0,0 +1,21 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
|
||||
|
||||
class SystemClock(BaseClock):
|
||||
|
||||
def __init__(self):
|
||||
self._time = 0
|
||||
|
||||
def get_time(self) -> int:
|
||||
return time.monotonic_ns() - self._time if self._time > 0 else 0
|
||||
|
||||
def start(self):
|
||||
self._time = time.monotonic_ns()
|
||||
@@ -8,17 +8,27 @@ from typing import Any, List, Mapping, Optional, Tuple
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import nanoseconds_to_str
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
from pipecat.vad.vad_analyzer import VADParams
|
||||
|
||||
|
||||
def format_pts(pts: int | None):
|
||||
return nanoseconds_to_str(pts) if pts else None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Frame:
|
||||
id: int = field(init=False)
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -44,7 +54,8 @@ class AudioRawFrame(DataFrame):
|
||||
self.num_frames = int(len(self.audio) / (self.num_channels * 2))
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -58,7 +69,8 @@ class ImageRawFrame(DataFrame):
|
||||
format: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -70,7 +82,8 @@ class URLImageRawFrame(ImageRawFrame):
|
||||
url: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(url: {self.url}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, url: {self.url}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -82,7 +95,8 @@ class VisionImageRawFrame(ImageRawFrame):
|
||||
text: str | None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(text: {self.text}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, text: {self.text}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -94,7 +108,8 @@ class UserImageRawFrame(ImageRawFrame):
|
||||
user_id: str
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, size: {self.size}, format: {self.format})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -107,7 +122,8 @@ class SpriteFrame(Frame):
|
||||
images: List[ImageRawFrame]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(size: {len(self.images)})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.images)})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -119,7 +135,8 @@ class TextFrame(DataFrame):
|
||||
text: str
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(text: {self.text})"
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, text: {self.text})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -130,9 +147,10 @@ class TranscriptionFrame(TextFrame):
|
||||
"""
|
||||
user_id: str
|
||||
timestamp: str
|
||||
language: Language | None = None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
|
||||
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -141,9 +159,10 @@ class InterimTranscriptionFrame(TextFrame):
|
||||
the transport's receive queue when a participant speaks."""
|
||||
user_id: str
|
||||
timestamp: str
|
||||
language: Language | None = None
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, text: {self.text}, timestamp: {self.timestamp})"
|
||||
return f"{self.name}(user: {self.user_id}, text: {self.text}, language: {self.language}, timestamp: {self.timestamp})"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -322,6 +341,7 @@ class ControlFrame(Frame):
|
||||
@dataclass
|
||||
class StartFrame(ControlFrame):
|
||||
"""This is the first frame that should be pushed down a pipeline."""
|
||||
clock: BaseClock
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
@@ -432,6 +452,13 @@ class LLMModelUpdateFrame(ControlFrame):
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update the TTS model.
|
||||
"""
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSVoiceUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new TTS voice.
|
||||
@@ -439,6 +466,31 @@ class TTSVoiceUpdateFrame(ControlFrame):
|
||||
voice: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSLanguageUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new TTS language and
|
||||
optional voice.
|
||||
|
||||
"""
|
||||
language: Language
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update the STT model and optional
|
||||
language.
|
||||
|
||||
"""
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class STTLanguageUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to STT language.
|
||||
"""
|
||||
language: Language
|
||||
|
||||
|
||||
@dataclass
|
||||
class FunctionCallInProgressFrame(SystemFrame):
|
||||
"""A frame signaling that a function call is in progress.
|
||||
@@ -455,7 +507,7 @@ class FunctionCallResultFrame(DataFrame):
|
||||
function_name: str
|
||||
tool_call_id: str
|
||||
arguments: str
|
||||
result: any
|
||||
result: Any
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -463,4 +515,4 @@ class VADParamsUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update VAD params. Intended
|
||||
to be pushed upstream from RTVI processor.
|
||||
"""
|
||||
params: dict
|
||||
params: VADParams
|
||||
|
||||
@@ -10,6 +10,8 @@ from typing import AsyncIterable, Iterable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.clocks.system_clock import SystemClock
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
@@ -60,11 +62,16 @@ class Source(FrameProcessor):
|
||||
|
||||
class PipelineTask:
|
||||
|
||||
def __init__(self, pipeline: BasePipeline, params: PipelineParams = PipelineParams()):
|
||||
def __init__(
|
||||
self,
|
||||
pipeline: BasePipeline,
|
||||
params: PipelineParams = PipelineParams(),
|
||||
clock: BaseClock = SystemClock()):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
|
||||
self._pipeline = pipeline
|
||||
self._clock = clock
|
||||
self._params = params
|
||||
self._finished = False
|
||||
|
||||
@@ -116,11 +123,14 @@ class PipelineTask:
|
||||
return MetricsFrame(ttfb=ttfb, processing=processing)
|
||||
|
||||
async def _process_down_queue(self):
|
||||
self._clock.start()
|
||||
|
||||
start_frame = StartFrame(
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_metrics,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
clock=self._clock
|
||||
)
|
||||
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ class LLMResponseAggregator(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, self._accumulator_frame):
|
||||
if self._aggregating:
|
||||
self._aggregation += f" {frame.text}"
|
||||
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
|
||||
# We have recevied a complete sentence, so if we have seen the
|
||||
# end frame and we were still aggregating, it means we should
|
||||
# send the aggregation.
|
||||
|
||||
@@ -9,6 +9,7 @@ import time
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
@@ -96,6 +97,9 @@ class FrameProcessor:
|
||||
self._next: "FrameProcessor" | None = None
|
||||
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
||||
|
||||
# Clock
|
||||
self._clock: BaseClock | None = None
|
||||
|
||||
# Properties
|
||||
self._allow_interruptions = False
|
||||
self._enable_metrics = False
|
||||
@@ -177,8 +181,12 @@ class FrameProcessor:
|
||||
def get_parent(self) -> "FrameProcessor":
|
||||
return self._parent
|
||||
|
||||
def get_clock(self) -> BaseClock:
|
||||
return self._clock
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._clock = frame.clock
|
||||
self._allow_interruptions = frame.allow_interruptions
|
||||
self._enable_metrics = frame.enable_metrics
|
||||
self._enable_usage_metrics = frame.enable_usage_metrics
|
||||
|
||||
@@ -81,11 +81,6 @@ class RTVIAction(BaseModel):
|
||||
return super().model_post_init(__context)
|
||||
|
||||
|
||||
#
|
||||
# Client -> Pipecat messages.
|
||||
#
|
||||
|
||||
|
||||
class RTVIServiceOptionConfig(BaseModel):
|
||||
name: str
|
||||
value: Any
|
||||
@@ -100,6 +95,16 @@ class RTVIConfig(BaseModel):
|
||||
config: List[RTVIServiceConfig]
|
||||
|
||||
|
||||
#
|
||||
# Client -> Pipecat messages.
|
||||
#
|
||||
|
||||
|
||||
class RTVIUpdateConfig(BaseModel):
|
||||
config: List[RTVIServiceConfig]
|
||||
interrupt: bool = False
|
||||
|
||||
|
||||
class RTVIActionRunArgument(BaseModel):
|
||||
name: str
|
||||
value: Any
|
||||
@@ -222,7 +227,7 @@ class RTVILLMFunctionCallResultData(BaseModel):
|
||||
function_name: str
|
||||
tool_call_id: str
|
||||
arguments: dict
|
||||
result: dict
|
||||
result: dict | str
|
||||
|
||||
|
||||
class RTVITranscriptionMessageData(BaseModel):
|
||||
@@ -351,8 +356,10 @@ class RTVIProcessor(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames
|
||||
elif isinstance(frame, StartFrame):
|
||||
await self._start(frame)
|
||||
# Push StartFrame before start(), because we want StartFrame to be
|
||||
# processed by every processor before any other frame is processed.
|
||||
await self.push_frame(frame, direction)
|
||||
await self._start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
# Push EndFrame before stop(), because stop() waits on the task to
|
||||
# finish and the task finishes when EndFrame is processed.
|
||||
@@ -489,8 +496,8 @@ class RTVIProcessor(FrameProcessor):
|
||||
case "get-config":
|
||||
await self._handle_get_config(message.id)
|
||||
case "update-config":
|
||||
config = RTVIConfig.model_validate(message.data)
|
||||
await self._handle_update_config(message.id, config)
|
||||
update_config = RTVIUpdateConfig.model_validate(message.data)
|
||||
await self._handle_update_config(message.id, update_config)
|
||||
case "action":
|
||||
action = RTVIActionRun.model_validate(message.data)
|
||||
await self._handle_action(message.id, action)
|
||||
@@ -545,17 +552,14 @@ class RTVIProcessor(FrameProcessor):
|
||||
await handler(self, service.name, option)
|
||||
self._update_config_option(service.name, option)
|
||||
|
||||
async def _update_config(self, data: RTVIConfig):
|
||||
async def _update_config(self, data: RTVIConfig, interrupt: bool):
|
||||
if interrupt:
|
||||
await self.interrupt_bot()
|
||||
for service_config in data.config:
|
||||
await self._update_service_config(service_config)
|
||||
|
||||
async def _handle_update_config(self, request_id: str, data: RTVIConfig):
|
||||
# NOTE(aleix): The bot might be talking while we receive a new
|
||||
# config. Let's interrupt it for now and update the config. Another
|
||||
# solution is to wait until the bot stops speaking and then apply the
|
||||
# config, but this definitely is more complicated to achieve.
|
||||
await self.interrupt_bot()
|
||||
await self._update_config(data)
|
||||
async def _handle_update_config(self, request_id: str, data: RTVIUpdateConfig):
|
||||
await self._update_config(RTVIConfig(config=data.config), data.interrupt)
|
||||
await self._handle_get_config(request_id)
|
||||
|
||||
async def _handle_function_call_result(self, data):
|
||||
@@ -583,7 +587,7 @@ class RTVIProcessor(FrameProcessor):
|
||||
async def _maybe_send_bot_ready(self):
|
||||
if self._pipeline_started and self._client_ready:
|
||||
await self._send_bot_ready()
|
||||
await self._update_config(self._config)
|
||||
await self._update_config(self._config, False)
|
||||
|
||||
async def _send_bot_ready(self):
|
||||
if not self._params.send_bot_ready:
|
||||
|
||||
@@ -78,8 +78,10 @@ class GStreamerPipelineSource(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames
|
||||
elif isinstance(frame, StartFrame):
|
||||
await self._start(frame)
|
||||
# Push StartFrame before start(), because we want StartFrame to be
|
||||
# processed by every processor before any other frame is processed.
|
||||
await self._internal_push_frame(frame, direction)
|
||||
await self._start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
# Push EndFrame before stop(), because stop() waits on the task to
|
||||
# finish and the task finishes when EndFrame is processed.
|
||||
|
||||
46
src/pipecat/serializers/livekit.py
Normal file
46
src/pipecat/serializers/livekit.py
Normal file
@@ -0,0 +1,46 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import ctypes
|
||||
import pickle
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from livekit.rtc import AudioFrame
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use LiveKit, you need to `pip install pipecat-ai[livekit]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LivekitFrameSerializer(FrameSerializer):
|
||||
SERIALIZABLE_TYPES = {
|
||||
AudioRawFrame: "audio",
|
||||
}
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if not isinstance(frame, AudioRawFrame):
|
||||
return None
|
||||
audio_frame = AudioFrame(
|
||||
data=frame.audio,
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
samples_per_channel=len(frame.audio) // ctypes.sizeof(ctypes.c_int16),
|
||||
)
|
||||
return pickle.dumps(audio_frame)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
audio_frame: AudioFrame = pickle.loads(data)['frame']
|
||||
return AudioRawFrame(
|
||||
audio=bytes(audio_frame.data),
|
||||
sample_rate=audio_frame.sample_rate,
|
||||
num_channels=audio_frame.num_channels,
|
||||
)
|
||||
@@ -9,7 +9,7 @@ import json
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.utils.audio import ulaw_to_pcm, pcm_to_ulaw
|
||||
|
||||
@@ -28,22 +28,25 @@ class TwilioFrameSerializer(FrameSerializer):
|
||||
self._params = params
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if not isinstance(frame, AudioRawFrame):
|
||||
return None
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
data = frame.audio
|
||||
|
||||
data = frame.audio
|
||||
|
||||
serialized_data = pcm_to_ulaw(data, frame.sample_rate, self._params.twilio_sample_rate)
|
||||
payload = base64.b64encode(serialized_data).decode("utf-8")
|
||||
answer = {
|
||||
"event": "media",
|
||||
"streamSid": self._stream_sid,
|
||||
"media": {
|
||||
"payload": payload
|
||||
serialized_data = pcm_to_ulaw(
|
||||
data, frame.sample_rate, self._params.twilio_sample_rate)
|
||||
payload = base64.b64encode(serialized_data).decode("utf-8")
|
||||
answer = {
|
||||
"event": "media",
|
||||
"streamSid": self._stream_sid,
|
||||
"media": {
|
||||
"payload": payload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return json.dumps(answer)
|
||||
return json.dumps(answer)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
answer = {"event": "clear", "streamSid": self._stream_sid}
|
||||
return json.dumps(answer)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
@@ -4,11 +4,12 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import wave
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import AsyncGenerator
|
||||
from typing import AsyncGenerator, List, Optional, Tuple
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
@@ -17,9 +18,15 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
STTLanguageUpdateFrame,
|
||||
STTModelUpdateFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSLanguageUpdateFrame,
|
||||
TTSModelUpdateFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSVoiceUpdateFrame,
|
||||
TextFrame,
|
||||
UserImageRequestFrame,
|
||||
@@ -27,11 +34,15 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.audio import calculate_audio_volume
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
from pipecat.utils.time import seconds_to_nanoseconds
|
||||
from pipecat.utils.utils import exp_smoothing
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -156,16 +167,32 @@ class TTSService(AIService):
|
||||
aggregate_sentences: bool = True,
|
||||
# if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames
|
||||
push_text_frames: bool = True,
|
||||
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
|
||||
push_stop_frames: bool = False,
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aggregate_sentences: bool = aggregate_sentences
|
||||
self._push_text_frames: bool = push_text_frames
|
||||
self._push_stop_frames: bool = push_stop_frames
|
||||
self._stop_frame_timeout_s: float = stop_frame_timeout_s
|
||||
self._stop_frame_task: Optional[asyncio.Task] = None
|
||||
self._stop_frame_queue: asyncio.Queue = asyncio.Queue()
|
||||
self._current_sentence: str = ""
|
||||
|
||||
@abstractmethod
|
||||
async def set_model(self, model: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_voice(self, voice: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_language(self, language: Language):
|
||||
pass
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
@@ -222,15 +249,175 @@ class TTSService(AIService):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TTSSpeakFrame):
|
||||
await self._push_tts_frames(frame.text, False)
|
||||
elif isinstance(frame, TTSModelUpdateFrame):
|
||||
await self.set_model(frame.model)
|
||||
elif isinstance(frame, TTSVoiceUpdateFrame):
|
||||
await self.set_voice(frame.voice)
|
||||
elif isinstance(frame, TTSLanguageUpdateFrame):
|
||||
await self.set_language(frame.language)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
if self._push_stop_frames and (
|
||||
isinstance(frame, StartInterruptionFrame) or
|
||||
isinstance(frame, TTSStartedFrame) or
|
||||
isinstance(frame, AudioRawFrame) or
|
||||
isinstance(frame, TTSStoppedFrame)):
|
||||
await self._stop_frame_queue.put(frame)
|
||||
|
||||
async def _stop_frame_handler(self):
|
||||
try:
|
||||
has_started = False
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(self._stop_frame_queue.get(),
|
||||
self._stop_frame_timeout_s)
|
||||
if isinstance(frame, TTSStartedFrame):
|
||||
has_started = True
|
||||
elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
has_started = False
|
||||
except asyncio.TimeoutError:
|
||||
if has_started:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
has_started = False
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
class AsyncTTSService(TTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
async def flush_audio(self):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncWordTTSService(AsyncTTSService):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._initial_word_timestamp = -1
|
||||
self._words_queue = asyncio.Queue()
|
||||
self._words_task = self.get_event_loop().create_task(self._words_task_handler())
|
||||
|
||||
def start_word_timestamps(self):
|
||||
if self._initial_word_timestamp == -1:
|
||||
self._initial_word_timestamp = self.get_clock().get_time()
|
||||
|
||||
def reset_word_timestamps(self):
|
||||
self._initial_word_timestamp = -1
|
||||
self._word_timestamps = []
|
||||
|
||||
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
|
||||
for (word, timestamp) in word_times:
|
||||
await self._words_queue.put((word, seconds_to_nanoseconds(timestamp)))
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._stop_words_task()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._stop_words_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
|
||||
await self.flush_audio()
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
self.reset_word_timestamps()
|
||||
|
||||
async def _stop_words_task(self):
|
||||
if self._words_task:
|
||||
self._words_task.cancel()
|
||||
await self._words_task
|
||||
|
||||
async def _words_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
(word, timestamp) = await self._words_queue.get()
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
else:
|
||||
frame = TextFrame(word)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
await self.push_frame(frame)
|
||||
self._words_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
|
||||
class STTService(AIService):
|
||||
"""STTService is a base class for speech-to-text services."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
async def set_model(self, model: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_language(self, language: Language):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def process_audio_frame(self, frame: AudioRawFrame):
|
||||
await self.process_generator(self.run_stt(frame.audio))
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Processes a frame of audio data, either buffering or transcribing it."""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
# In this service we accumulate audio internally and at the end we
|
||||
# push a TextFrame. We don't really want to push audio frames down.
|
||||
await self.process_audio_frame(frame)
|
||||
elif isinstance(frame, STTModelUpdateFrame):
|
||||
await self.set_model(frame.model)
|
||||
elif isinstance(frame, STTLanguageUpdateFrame):
|
||||
await self.set_language(frame.language)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class SegmentedSTTService(STTService):
|
||||
"""SegmentedSTTService is an STTService that will detect speech and will run
|
||||
speech-to-text on speech segments only, instead of a continous stream.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
*,
|
||||
min_volume: float = 0.6,
|
||||
@@ -251,24 +438,7 @@ class STTService(AIService):
|
||||
self._smoothing_factor = 0.2
|
||||
self._prev_volume = 0
|
||||
|
||||
@abstractmethod
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
def _new_wave(self):
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(self._num_channels)
|
||||
ww.setframerate(self._sample_rate)
|
||||
return (content, ww)
|
||||
|
||||
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
|
||||
volume = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
async def _append_audio(self, frame: AudioRawFrame):
|
||||
async def process_audio_frame(self, frame: AudioRawFrame):
|
||||
# Try to filter out empty background noise
|
||||
volume = self._get_smoothed_volume(frame)
|
||||
if volume >= self._min_volume:
|
||||
@@ -288,9 +458,7 @@ class STTService(AIService):
|
||||
self._silence_num_frames = 0
|
||||
self._wave.close()
|
||||
self._content.seek(0)
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_stt(self._content.read()))
|
||||
await self.stop_processing_metrics()
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -299,16 +467,17 @@ class STTService(AIService):
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
self._wave.close()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Processes a frame of audio data, either buffering or transcribing it."""
|
||||
await super().process_frame(frame, direction)
|
||||
def _new_wave(self):
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(self._num_channels)
|
||||
ww.setframerate(self._sample_rate)
|
||||
return (content, ww)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
# In this service we accumulate audio internally and at the end we
|
||||
# push a TextFrame. We don't really want to push audio frames down.
|
||||
await self._append_audio(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
def _get_smoothed_volume(self, frame: AudioRawFrame) -> float:
|
||||
volume = calculate_audio_volume(frame.audio, frame.sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
|
||||
class ImageGenService(AIService):
|
||||
|
||||
@@ -43,7 +43,7 @@ from pipecat.processors.aggregators.llm_response import (
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from anthropic import AsyncAnthropic
|
||||
from anthropic import AsyncAnthropic, NOT_GIVEN, NotGiven
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
@@ -135,7 +135,7 @@ class AnthropicLLMService(LLMService):
|
||||
|
||||
response = await api_call(
|
||||
tools=context.tools or [],
|
||||
system=context.system or [],
|
||||
system=context.system,
|
||||
messages=messages,
|
||||
model=self._model,
|
||||
max_tokens=self._max_tokens,
|
||||
@@ -171,7 +171,8 @@ class AnthropicLLMService(LLMService):
|
||||
await self.call_function(context=context,
|
||||
tool_call_id=tool_use_block.id,
|
||||
function_name=tool_use_block.name,
|
||||
arguments=json.loads(json_accumulator))
|
||||
arguments=json.loads(json_accumulator) if json_accumulator else dict()
|
||||
)
|
||||
|
||||
# Calculate usage. Do this here in its own if statement, because there may be usage
|
||||
# data embedded in messages that we do other processing for, above.
|
||||
@@ -269,7 +270,7 @@ class AnthropicLLMContext(OpenAILLMContext):
|
||||
tools: list[dict] | None = None,
|
||||
tool_choice: dict | None = None,
|
||||
*,
|
||||
system: List | None = None
|
||||
system: str | NotGiven = NOT_GIVEN
|
||||
):
|
||||
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
|
||||
self._user_image_request_context = {}
|
||||
|
||||
@@ -12,7 +12,6 @@ import time
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
ErrorFrame,
|
||||
@@ -26,7 +25,9 @@ from pipecat.frames.frames import (
|
||||
TextFrame,
|
||||
LLMFullResponseEndFrame
|
||||
)
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.services.ai_services import AsyncWordTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -40,7 +41,26 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class CartesiaTTSService(TTSService):
|
||||
def language_to_cartesia_language(language: Language) -> str | None:
|
||||
match language:
|
||||
case Language.DE:
|
||||
return "de"
|
||||
case Language.EN:
|
||||
return "en"
|
||||
case Language.ES:
|
||||
return "es"
|
||||
case Language.FR:
|
||||
return "fr"
|
||||
case Language.JA:
|
||||
return "ja"
|
||||
case Language.PT:
|
||||
return "pt"
|
||||
case Language.ZH:
|
||||
return "zh"
|
||||
return None
|
||||
|
||||
|
||||
class CartesiaTTSService(AsyncWordTTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -54,19 +74,17 @@ class CartesiaTTSService(TTSService):
|
||||
sample_rate: int = 16000,
|
||||
language: str = "en",
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Aggregating sentences still gives cleaner-sounding results and fewer
|
||||
# artifacts than streaming one word at a time. On average, waiting for
|
||||
# a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3
|
||||
# model, and it's worth it for the better audio quality.
|
||||
self._aggregate_sentences = True
|
||||
|
||||
# we don't want to automatically push LLM response text frames, because the
|
||||
# context aggregators will add them to the LLM context even if we're
|
||||
# interrupted. cartesia gives us word-by-word timestamps. we can use those
|
||||
# to generate text frames ourselves aligned with the playout timing of the audio!
|
||||
self._push_text_frames = False
|
||||
# artifacts than streaming one word at a time. On average, waiting for a
|
||||
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
|
||||
# 3 model, and it's worth it for the better audio quality.
|
||||
#
|
||||
# We also don't want to automatically push LLM response text frames,
|
||||
# because the context aggregators will add them to the LLM context even
|
||||
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
|
||||
# can use those to generate text frames ourselves aligned with the
|
||||
# playout timing of the audio!
|
||||
super().__init__(aggregate_sentences=True, push_text_frames=False, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._cartesia_version = cartesia_version
|
||||
@@ -82,18 +100,23 @@ class CartesiaTTSService(TTSService):
|
||||
|
||||
self._websocket = None
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
self._receive_task = None
|
||||
self._context_appending_task = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.debug(f"Switching TTS model to: [{model}]")
|
||||
self._model_id = model
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
|
||||
async def set_language(self, language: Language):
|
||||
logger.debug(f"Switching TTS language to: [{language}]")
|
||||
self._language = language_to_cartesia_language(language)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
@@ -112,7 +135,6 @@ class CartesiaTTSService(TTSService):
|
||||
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
|
||||
)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler())
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -121,10 +143,6 @@ class CartesiaTTSService(TTSService):
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._context_appending_task:
|
||||
self._context_appending_task.cancel()
|
||||
await self._context_appending_task
|
||||
self._context_appending_task = None
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
@@ -134,18 +152,33 @@ class CartesiaTTSService(TTSService):
|
||||
self._websocket = None
|
||||
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
await self.stop_all_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
self._context_id = None
|
||||
|
||||
async def flush_audio(self):
|
||||
if not self._context_id or not self._websocket:
|
||||
return
|
||||
logger.debug("Flushing audio")
|
||||
msg = {
|
||||
"transcript": "",
|
||||
"continue": False,
|
||||
"context_id": self._context_id,
|
||||
"model_id": self._model_id,
|
||||
"voice": {
|
||||
"mode": "id",
|
||||
"id": self._voice_id
|
||||
},
|
||||
"output_format": self._output_format,
|
||||
"language": self._language,
|
||||
"add_timestamps": True,
|
||||
}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
@@ -160,16 +193,14 @@ class CartesiaTTSService(TTSService):
|
||||
# because we are likely still playing out audio and need the
|
||||
# timestamp to set send context frames.
|
||||
self._context_id = None
|
||||
self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0))
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
|
||||
elif msg["type"] == "timestamps":
|
||||
# logger.debug(f"TIMESTAMPS: {msg}")
|
||||
self._timestamped_words_buffer.extend(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"]))
|
||||
await self.add_word_timestamps(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
|
||||
)
|
||||
elif msg["type"] == "chunk":
|
||||
await self.stop_ttfb_metrics()
|
||||
if not self._context_id_start_timestamp:
|
||||
self._context_id_start_timestamp = time.time()
|
||||
self.start_word_timestamps()
|
||||
frame = AudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
sample_rate=self._output_format["sample_rate"],
|
||||
@@ -188,27 +219,6 @@ class CartesiaTTSService(TTSService):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _context_appending_task_handler(self):
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
if not self._context_id_start_timestamp:
|
||||
continue
|
||||
elapsed_seconds = time.time() - self._context_id_start_timestamp
|
||||
# Pop all words from self._timestamped_words_buffer that are
|
||||
# older than the elapsed time and print a message about them to
|
||||
# the console.
|
||||
while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds:
|
||||
word, timestamp = self._timestamped_words_buffer.pop(0)
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
continue
|
||||
await self.push_frame(TextFrame(word))
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
|
||||
@@ -16,12 +16,11 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TranscriptionFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AsyncAIService, TTSService
|
||||
from pipecat.services.ai_services import STTService, TTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
from loguru import logger
|
||||
@@ -30,10 +29,12 @@ from loguru import logger
|
||||
# See .env.example for Deepgram configuration needed
|
||||
try:
|
||||
from deepgram import (
|
||||
AsyncListenWebSocketClient,
|
||||
DeepgramClient,
|
||||
DeepgramClientOptions,
|
||||
LiveTranscriptionEvents,
|
||||
LiveOptions,
|
||||
LiveResultResponse
|
||||
)
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
@@ -107,7 +108,7 @@ class DeepgramTTSService(TTSService):
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
|
||||
class DeepgramSTTService(AsyncAIService):
|
||||
class DeepgramSTTService(STTService):
|
||||
def __init__(self,
|
||||
*,
|
||||
api_key: str,
|
||||
@@ -120,6 +121,8 @@ class DeepgramSTTService(AsyncAIService):
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
smart_format=True,
|
||||
punctuate=True,
|
||||
profanity_filter=True,
|
||||
),
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
@@ -128,40 +131,62 @@ class DeepgramSTTService(AsyncAIService):
|
||||
|
||||
self._client = DeepgramClient(
|
||||
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
|
||||
self._connection = self._client.listen.asynclive.v("1")
|
||||
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
|
||||
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
async def set_model(self, model: str):
|
||||
logger.debug(f"Switching STT model to: [{model}]")
|
||||
self._live_options.model = model
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
await self._connection.send(frame.audio)
|
||||
else:
|
||||
await self.queue_frame(frame, direction)
|
||||
async def set_language(self, language: Language):
|
||||
logger.debug(f"Switching STT language to: [{language}]")
|
||||
self._live_options.language = language
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
await self.start_processing_metrics()
|
||||
await self._connection.send(audio)
|
||||
yield None
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
async def _connect(self):
|
||||
if await self._connection.start(self._live_options):
|
||||
logger.debug(f"{self}: Connected to Deepgram")
|
||||
else:
|
||||
logger.error(f"{self}: Unable to connect to Deepgram")
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._connection.finish()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._connection.finish()
|
||||
async def _disconnect(self):
|
||||
if self._connection.is_connected:
|
||||
await self._connection.finish()
|
||||
logger.debug(f"{self}: Disconnected from Deepgram")
|
||||
|
||||
async def _on_message(self, *args, **kwargs):
|
||||
result = kwargs["result"]
|
||||
result: LiveResultResponse = kwargs["result"]
|
||||
if len(result.channel.alternatives) == 0:
|
||||
return
|
||||
is_final = result.is_final
|
||||
transcript = result.channel.alternatives[0].transcript
|
||||
language = None
|
||||
if result.channel.alternatives[0].languages:
|
||||
language = result.channel.alternatives[0].languages[0]
|
||||
language = Language(language)
|
||||
if len(transcript) > 0:
|
||||
if is_final:
|
||||
await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601(), language))
|
||||
else:
|
||||
await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601(), language))
|
||||
|
||||
@@ -4,18 +4,72 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
|
||||
from typing import AsyncGenerator, Literal
|
||||
from typing import Any, AsyncGenerator, List, Literal, Mapping, Tuple
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, TTSStartedFrame, TTSStoppedFrame
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AsyncWordTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# See .env.example for ElevenLabs configuration needed
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`. Also, set `ELEVENLABS_API_KEY` environment variable.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
class ElevenLabsTTSService(TTSService):
|
||||
|
||||
def sample_rate_from_output_format(output_format: str) -> int:
|
||||
match output_format:
|
||||
case "pcm_16000":
|
||||
return 16000
|
||||
case "pcm_22050":
|
||||
return 22050
|
||||
case "pcm_24000":
|
||||
return 24000
|
||||
case "pcm_44100":
|
||||
return 44100
|
||||
return 16000
|
||||
|
||||
|
||||
def calculate_word_times(
|
||||
alignment_info: Mapping[str, Any], cumulative_time: float
|
||||
) -> List[Tuple[str, float]]:
|
||||
zipped_times = list(zip(alignment_info["chars"], alignment_info["charStartTimesMs"]))
|
||||
|
||||
words = "".join(alignment_info["chars"]).split(" ")
|
||||
|
||||
# Calculate start time for each word. We do this by finding a space character
|
||||
# and using the previous word time, also taking into account there might not
|
||||
# be a space at the end.
|
||||
times = []
|
||||
for (i, (a, b)) in enumerate(zipped_times):
|
||||
if a == " " or i == len(zipped_times) - 1:
|
||||
t = cumulative_time + (zipped_times[i - 1][1] / 1000.0)
|
||||
times.append(t)
|
||||
|
||||
word_times = list(zip(words, times))
|
||||
|
||||
return word_times
|
||||
|
||||
|
||||
class ElevenLabsTTSService(AsyncWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
output_format: Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"] = "pcm_16000"
|
||||
|
||||
@@ -24,56 +78,186 @@ class ElevenLabsTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
model: str = "eleven_turbo_v2_5",
|
||||
url: str = "wss://api.elevenlabs.io",
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
# Aggregating sentences still gives cleaner-sounding results and fewer
|
||||
# artifacts than streaming one word at a time. On average, waiting for a
|
||||
# full sentence should only "cost" us 15ms or so with GPT-4o or a Llama
|
||||
# 3 model, and it's worth it for the better audio quality.
|
||||
#
|
||||
# We also don't want to automatically push LLM response text frames,
|
||||
# because the context aggregators will add them to the LLM context even
|
||||
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
|
||||
# can use those to generate text frames ourselves aligned with the
|
||||
# playout timing of the audio!
|
||||
#
|
||||
# Finally, ElevenLabs doesn't provide information on when the bot stops
|
||||
# speaking for a while, so we want the parent class to send TTSStopFrame
|
||||
# after a short period not receiving any audio.
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
self._model = model
|
||||
self._url = url
|
||||
self._params = params
|
||||
self._aiohttp_session = aiohttp_session
|
||||
self._sample_rate = sample_rate_from_output_format(params.output_format)
|
||||
|
||||
# Websocket connection to ElevenLabs.
|
||||
self._websocket = None
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
self._started = False
|
||||
self._cumulative_time = 0
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.debug(f"Switching TTS model to: [{model}]")
|
||||
self._model = model
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def flush_audio(self):
|
||||
if self._websocket:
|
||||
msg = {"text": " ", "flush": True}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
self._started = False
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0)])
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
voice_id = self._voice_id
|
||||
model = self._model
|
||||
output_format = self._params.output_format
|
||||
url = f"{self._url}/v1/text-to-speech/{voice_id}/stream-input?model_id={model}&output_format={output_format}"
|
||||
self._websocket = await websockets.connect(url)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())
|
||||
|
||||
# According to ElevenLabs, we should always start with a single space.
|
||||
msg = {
|
||||
"text": " ",
|
||||
"xi_api_key": self._api_key,
|
||||
}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
self._receive_task = None
|
||||
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
await self._keepalive_task
|
||||
self._keepalive_task = None
|
||||
|
||||
if self._websocket:
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for message in self._websocket:
|
||||
msg = json.loads(message)
|
||||
if msg.get("audio"):
|
||||
await self.stop_ttfb_metrics()
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
frame = AudioRawFrame(audio, self._sample_rate, 1)
|
||||
await self.push_frame(frame)
|
||||
|
||||
if msg.get("alignment"):
|
||||
word_times = calculate_word_times(msg["alignment"], self._cumulative_time)
|
||||
await self.add_word_timestamps(word_times)
|
||||
self._cumulative_time = word_times[-1][1]
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _keepalive_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
await self._send_text("")
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _send_text(self, text: str):
|
||||
if self._websocket:
|
||||
msg = {"text": text + " "}
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
url = f"https://api.elevenlabs.io/v1/text-to-speech/{self._voice_id}/stream"
|
||||
try:
|
||||
if not self._websocket:
|
||||
await self._connect()
|
||||
|
||||
payload = {"text": text, "model_id": self._model}
|
||||
try:
|
||||
if not self._started:
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
await self.start_ttfb_metrics()
|
||||
self._started = True
|
||||
self._cumulative_time = 0
|
||||
|
||||
querystring = {
|
||||
"output_format": self._params.output_format
|
||||
}
|
||||
|
||||
headers = {
|
||||
"xi-api-key": self._api_key,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
async with self._aiohttp_session.post(url, json=payload, headers=headers, params=querystring) as r:
|
||||
if r.status != 200:
|
||||
text = await r.text()
|
||||
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
|
||||
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
|
||||
await self._send_text(text)
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error sending message: {e}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
return
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
async for chunk in r.content:
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = AudioRawFrame(chunk, 16000, 1)
|
||||
yield frame
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
166
src/pipecat/services/lmnt.py
Normal file
166
src/pipecat/services/lmnt.py
Normal file
@@ -0,0 +1,166 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.services.ai_services import AsyncTTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# See .env.example for LMNT configuration needed
|
||||
try:
|
||||
from lmnt.api import Speech
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use LMNT, you need to `pip install pipecat-ai[lmnt]`. Also, set `LMNT_API_KEY` environment variable.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LmntTTSService(AsyncTTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
sample_rate: int = 24000,
|
||||
language: str = "en",
|
||||
**kwargs):
|
||||
# Let TTSService produce TTSStoppedFrames after a short delay of
|
||||
# no activity.
|
||||
super().__init__(push_stop_frames=True, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
self._output_format = {
|
||||
"container": "raw",
|
||||
"encoding": "pcm_s16le",
|
||||
"sample_rate": sample_rate,
|
||||
}
|
||||
self._language = language
|
||||
|
||||
self._speech = None
|
||||
self._connection = None
|
||||
self._receive_task = None
|
||||
# Indicates if we have sent TTSStartedFrame. It will reset to False when
|
||||
# there's an interruption or TTSStoppedFrame.
|
||||
self._started = False
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await super().push_frame(frame, direction)
|
||||
if isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
self._started = False
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
self._speech = Speech()
|
||||
self._connection = await self._speech.synthesize_streaming(
|
||||
self._voice_id, format="raw", sample_rate=self._output_format["sample_rate"])
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._connection = None
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
await self.stop_all_metrics()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
self._receive_task = None
|
||||
if self._connection:
|
||||
await self._connection.socket.close()
|
||||
self._connection = None
|
||||
if self._speech:
|
||||
await self._speech.close()
|
||||
self._speech = None
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for msg in self._connection:
|
||||
if "error" in msg:
|
||||
logger.error(f'{self} error: {msg["error"]}')
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self.stop_all_metrics()
|
||||
await self.push_error(ErrorFrame(f'{self} error: {msg["error"]}'))
|
||||
elif "audio" in msg:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = AudioRawFrame(
|
||||
audio=msg["audio"],
|
||||
sample_rate=self._output_format["sample_rate"],
|
||||
num_channels=1
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
logger.error(f"LMNT error, unknown message type: {msg}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
if not self._connection:
|
||||
await self._connect()
|
||||
|
||||
if not self._started:
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
await self.start_ttfb_metrics()
|
||||
self._started = True
|
||||
|
||||
try:
|
||||
await self._connection.append_text(text)
|
||||
await self._connection.flush()
|
||||
await self.start_tts_usage_metrics(text)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error sending message: {e}")
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
@@ -173,7 +173,7 @@ class TogetherLLMService(LLMService):
|
||||
try:
|
||||
arguments = json.loads(args_string)
|
||||
await self.call_function(context=context,
|
||||
tool_call_id=uuid.uuid4(),
|
||||
tool_call_id=str(uuid.uuid4()),
|
||||
function_name=function_name,
|
||||
arguments=arguments)
|
||||
return
|
||||
@@ -301,7 +301,8 @@ class TogetherAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
self._function_call_result = None
|
||||
self._context.add_message({
|
||||
"role": "tool",
|
||||
"content": frame.result
|
||||
# Together expects the content here to be a string, so stringify it
|
||||
"content": str(frame.result)
|
||||
})
|
||||
run_llm = True
|
||||
else:
|
||||
|
||||
@@ -14,7 +14,7 @@ from typing import AsyncGenerator
|
||||
import numpy as np
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
|
||||
from pipecat.services.ai_services import STTService
|
||||
from pipecat.services.ai_services import SegmentedSTTService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
from loguru import logger
|
||||
@@ -38,7 +38,7 @@ class Model(Enum):
|
||||
DISTIL_MEDIUM_EN = "Systran/faster-distil-whisper-medium.en"
|
||||
|
||||
|
||||
class WhisperSTTService(STTService):
|
||||
class WhisperSTTService(SegmentedSTTService):
|
||||
"""Class to transcribe audio with a locally-downloaded Whisper model"""
|
||||
|
||||
def __init__(self,
|
||||
@@ -77,6 +77,7 @@ class WhisperSTTService(STTService):
|
||||
yield ErrorFrame("Whisper model not available")
|
||||
return
|
||||
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
# Divide by 32768 because we have signed 16-bit data.
|
||||
@@ -88,7 +89,9 @@ class WhisperSTTService(STTService):
|
||||
if segment.no_speech_prob < self._no_speech_prob:
|
||||
text += f"{segment.text} "
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
if text:
|
||||
await self.stop_ttfb_metrics()
|
||||
logger.debug(f"Transcription: [{text}]")
|
||||
yield TranscriptionFrame(text, "", time_now_iso8601())
|
||||
|
||||
0
src/pipecat/transcriptions/__init__.py
Normal file
0
src/pipecat/transcriptions/__init__.py
Normal file
64
src/pipecat/transcriptions/language.py
Normal file
64
src/pipecat/transcriptions/language.py
Normal file
@@ -0,0 +1,64 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import sys
|
||||
|
||||
from enum import Enum
|
||||
|
||||
if sys.version_info < (3, 11):
|
||||
class StrEnum(str, Enum):
|
||||
def __new__(cls, value):
|
||||
obj = str.__new__(cls, value)
|
||||
obj._value_ = value
|
||||
return obj
|
||||
else:
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
class Language(StrEnum):
|
||||
BG = "bg" # Bulgarian
|
||||
CA = "ca" # Catalan
|
||||
ZH = "zh" # Chinese simplified
|
||||
ZH_TW = "zh-TW" # Chinese traditional
|
||||
CS = "cs" # Czech
|
||||
DA = "da" # Danish
|
||||
NL = "nl" # Dutch
|
||||
EN = "en" # English
|
||||
EN_US = "en-US" # English (USA)
|
||||
EN_AU = "en-AU" # English (Australia)
|
||||
EN_GB = "en-GB" # English (Great Britain)
|
||||
EN_NZ = "en-NZ" # English (New Zealand)
|
||||
EN_IN = "en-IN" # English (India)
|
||||
ET = "et" # Estonian
|
||||
FI = "fi" # Finnish
|
||||
NL_BE = "nl-BE" # Flemmish
|
||||
FR = "fr" # French
|
||||
FR_CA = "fr-CA" # French (Canada)
|
||||
DE = "de" # German
|
||||
DE_CH = "de-CH" # German (Switzerland)
|
||||
EL = "el" # Greek
|
||||
HI = "hi" # Hindi
|
||||
HU = "hu" # Hungarian
|
||||
ID = "id" # Indonesian
|
||||
IT = "it" # Italian
|
||||
JA = "ja" # Japanese
|
||||
KO = "ko" # Korean
|
||||
LV = "lv" # Latvian
|
||||
LT = "lt" # Lithuanian
|
||||
MS = "ms" # Malay
|
||||
NO = "no" # Norwegian
|
||||
PL = "pl" # Polish
|
||||
PT = "pt" # Portuguese
|
||||
PT_BR = "pt-BR" # Portuguese (Brazil)
|
||||
RO = "ro" # Romanian
|
||||
RU = "ru" # Russian
|
||||
SK = "sk" # Slovak
|
||||
ES = "es" # Spanish
|
||||
SV = "sv" # Swedish
|
||||
TH = "th" # Thai
|
||||
TR = "tr" # Turkish
|
||||
UK = "uk" # Ukrainian
|
||||
VI = "vi" # Vietnamese
|
||||
@@ -23,7 +23,7 @@ from pipecat.frames.frames import (
|
||||
UserStoppedSpeakingFrame,
|
||||
VADParamsUpdateFrame)
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.vad.vad_analyzer import VADAnalyzer, VADParams, VADState
|
||||
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -96,8 +96,10 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
# Control frames
|
||||
elif isinstance(frame, StartFrame):
|
||||
await self.start(frame)
|
||||
# Push StartFrame before start(), because we want StartFrame to be
|
||||
# processed by every processor before any other frame is processed.
|
||||
await self._internal_push_frame(frame, direction)
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
# Push EndFrame before stop(), because stop() waits on the task to
|
||||
# finish and the task finishes when EndFrame is processed.
|
||||
@@ -105,9 +107,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
await self.stop(frame)
|
||||
elif isinstance(frame, VADParamsUpdateFrame):
|
||||
vad_analyzer = self.vad_analyzer()
|
||||
if not vad_analyzer:
|
||||
pass
|
||||
vad_analyzer.set_params(frame.params)
|
||||
if vad_analyzer:
|
||||
vad_analyzer.set_params(frame.params)
|
||||
# Other frames
|
||||
else:
|
||||
await self._internal_push_frame(frame, direction)
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import asyncio
|
||||
import itertools
|
||||
import time
|
||||
import sys
|
||||
|
||||
from PIL import Image
|
||||
from typing import List
|
||||
@@ -30,11 +31,14 @@ from pipecat.frames.frames import (
|
||||
SystemFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TextFrame,
|
||||
TransportMessageFrame)
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
|
||||
class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
@@ -64,7 +68,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Create sink frame task. This is the task that will actually write
|
||||
# audio or video frames. We write audio/video in a task so we can keep
|
||||
# generating frames upstream while, for example, the audio is playing.
|
||||
self._create_sink_task()
|
||||
self._create_sink_tasks()
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
@@ -134,8 +138,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# queue.
|
||||
#
|
||||
if isinstance(frame, CancelFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
await self.cancel(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, StartInterruptionFrame) or isinstance(frame, StopInterruptionFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
await self._handle_interruptions(frame)
|
||||
@@ -149,6 +153,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._sink_queue.put(frame)
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
await self.stop(frame)
|
||||
# Other frames.
|
||||
@@ -158,6 +163,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._handle_image(frame)
|
||||
elif isinstance(frame, TransportMessageFrame) and frame.urgent:
|
||||
await self.send_message(frame)
|
||||
# TODO(aleix): Images and audio should support presentation timestamps.
|
||||
elif frame.pts:
|
||||
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
@@ -166,10 +174,14 @@ class BaseOutputTransport(FrameProcessor):
|
||||
return
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Stop sink task.
|
||||
# Stop sink tasks.
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
self._create_sink_task()
|
||||
# Stop sink clock tasks.
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
# Create sink tasks.
|
||||
self._create_sink_tasks()
|
||||
# Stop push task.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
@@ -201,48 +213,90 @@ class BaseOutputTransport(FrameProcessor):
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
def _create_sink_task(self):
|
||||
#
|
||||
# Sink tasks
|
||||
#
|
||||
|
||||
def _create_sink_tasks(self):
|
||||
loop = self.get_event_loop()
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = loop.create_task(self._sink_task_handler())
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self._internal_push_frame(frame)
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
elif isinstance(frame, ImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._bot_stopped_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
else:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._sink_queue.get()
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
await self._internal_push_frame(frame)
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
elif isinstance(frame, ImageRawFrame):
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
await self._bot_stopped_speaking()
|
||||
await self._internal_push_frame(frame)
|
||||
else:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
await self._sink_frame_handler(frame)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
|
||||
self._sink_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink queue: {e}")
|
||||
|
||||
async def _sink_clock_frame_handler(self, frame: Frame):
|
||||
# TODO(aleix): For now we just process TextFrame. But we should process
|
||||
# audio and video as well.
|
||||
if isinstance(frame, TextFrame):
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
async def _sink_clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
timestamp, _, frame = await self._sink_clock_queue.get()
|
||||
|
||||
# If we hit an EndFrame, we cna finish right away.
|
||||
running = not isinstance(frame, EndFrame)
|
||||
|
||||
# If we have a frame we check it's presentation timestamp. If it
|
||||
# has already passed we process it, otherwise we wait until it's
|
||||
# time to process it.
|
||||
if running:
|
||||
current_time = self.get_clock().get_time()
|
||||
if timestamp <= current_time:
|
||||
await self._sink_clock_frame_handler(frame)
|
||||
else:
|
||||
wait_time = nanoseconds_to_seconds(timestamp - current_time)
|
||||
await asyncio.sleep(wait_time)
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
self._sink_clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
logger.debug("Bot started speaking")
|
||||
self._bot_speaking = True
|
||||
await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
logger.debug("Bot stopped speaking")
|
||||
self._bot_speaking = False
|
||||
await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ class TransportParams(BaseModel):
|
||||
audio_out_is_live: bool = False
|
||||
audio_out_sample_rate: int = 16000
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_in_channels: int = 1
|
||||
|
||||
117
src/pipecat/transports/network/fastapi_http.py
Normal file
117
src/pipecat/transports/network/fastapi_http.py
Normal file
@@ -0,0 +1,117 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import io
|
||||
import wave
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from fastapi import Request, Response
|
||||
from starlette.background import BackgroundTask
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use FastAPI HTTP SSE, you need to `pip install pipecat-ai[http]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FastAPIHTTPParams(TransportParams):
|
||||
serializer: FrameSerializer
|
||||
|
||||
|
||||
class FastAPIHTTPInputTransport(BaseInputTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
params: FastAPIHTTPParams,
|
||||
**kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._params = params
|
||||
self._request = None
|
||||
|
||||
# todo: this should probably expect a list of frames, not just one frame
|
||||
async def handle_request(self, request: Request):
|
||||
self._request = request
|
||||
frames_list = await request.json()
|
||||
logger.debug(f"Received frames: {frames_list}")
|
||||
for frame in frames_list:
|
||||
logger.debug(f"Received frame: {frame}")
|
||||
frame = self._params.serializer.deserialize(frame)
|
||||
if frame and isinstance(frame, AudioRawFrame):
|
||||
await self.push_audio_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
class FastAPIHTTPOutputTransport(BaseOutputTransport):
|
||||
|
||||
def __init__(self, params: FastAPIHTTPParams, **kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._params = params
|
||||
self._event_queue = asyncio.Queue()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
pass
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
await self._event_queue.put(payload)
|
||||
|
||||
async def event_generator(self):
|
||||
while True:
|
||||
event = await self._event_queue.get()
|
||||
logger.debug(f"Sending event {event}")
|
||||
yield event
|
||||
|
||||
|
||||
class FastAPIHTTPTransport(BaseTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
params: FastAPIHTTPParams,
|
||||
input_name: str | None = None,
|
||||
output_name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None):
|
||||
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
|
||||
self._params = params
|
||||
self._request = None
|
||||
|
||||
self._input = FastAPIHTTPInputTransport(
|
||||
self._params, name=self._input_name)
|
||||
self._output = FastAPIHTTPOutputTransport(
|
||||
self._params, name=self._output_name)
|
||||
|
||||
def input(self) -> FrameProcessor:
|
||||
return self._input
|
||||
|
||||
def output(self) -> FrameProcessor:
|
||||
return self._output
|
||||
|
||||
async def handle_request(self, request: Request):
|
||||
self._request = request
|
||||
await self._input.handle_request(request)
|
||||
return EventSourceResponse(self._output.event_generator())
|
||||
@@ -12,8 +12,8 @@ import wave
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, StartFrame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame, StartInterruptionFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
@@ -91,13 +91,20 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._websocket = websocket
|
||||
self._params = params
|
||||
self._audio_buffer = bytes()
|
||||
self._websocket_audio_buffer = bytes()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
self._audio_buffer += frames
|
||||
while len(self._audio_buffer) >= self._params.audio_frame_size:
|
||||
self._websocket_audio_buffer += frames
|
||||
while len(self._websocket_audio_buffer):
|
||||
frame = AudioRawFrame(
|
||||
audio=self._audio_buffer[:self._params.audio_frame_size],
|
||||
audio=self._websocket_audio_buffer[:
|
||||
self._params.audio_frame_size],
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
num_channels=self._params.audio_out_channels
|
||||
)
|
||||
@@ -121,7 +128,13 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._websocket.send_text(payload)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
self._websocket_audio_buffer = self._websocket_audio_buffer[
|
||||
self._params.audio_frame_size:]
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._websocket.send_text(payload)
|
||||
|
||||
|
||||
class FastAPIWebsocketTransport(BaseTransport):
|
||||
|
||||
@@ -36,6 +36,7 @@ from pipecat.frames.frames import (
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
@@ -305,7 +306,7 @@ class DailyTransportClient(EventHandler):
|
||||
if self._token and self._params.transcription_enabled:
|
||||
await self._start_transcription()
|
||||
|
||||
await self._callbacks.on_joined(data["participants"]["local"])
|
||||
await self._callbacks.on_joined(data)
|
||||
else:
|
||||
error_msg = f"Error joining {self._room_url}: {error}"
|
||||
logger.error(error_msg)
|
||||
@@ -365,6 +366,12 @@ class DailyTransportClient(EventHandler):
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"microphone": {
|
||||
"sendSettings": {
|
||||
"channelConfig": "stereo" if self._params.audio_out_channels == 2 else "mono",
|
||||
"bitrate": self._params.audio_out_bitrate,
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
@@ -864,8 +871,8 @@ class DailyTransport(BaseTransport):
|
||||
self._input.capture_participant_video(
|
||||
participant_id, framerate, video_source, color_format)
|
||||
|
||||
async def _on_joined(self, participant):
|
||||
await self._call_event_handler("on_joined", participant)
|
||||
async def _on_joined(self, data):
|
||||
await self._call_event_handler("on_joined", data)
|
||||
|
||||
async def _on_left(self):
|
||||
await self._call_event_handler("on_left")
|
||||
@@ -950,11 +957,16 @@ class DailyTransport(BaseTransport):
|
||||
text = message["text"]
|
||||
timestamp = message["timestamp"]
|
||||
is_final = message["rawResponse"]["is_final"]
|
||||
try:
|
||||
language = message["rawResponse"]["channel"]["alternatives"][0]["languages"][0]
|
||||
language = Language(language)
|
||||
except KeyError:
|
||||
language = None
|
||||
if is_final:
|
||||
frame = TranscriptionFrame(text, participant_id, timestamp)
|
||||
frame = TranscriptionFrame(text, participant_id, timestamp, language)
|
||||
logger.debug(f"Transcription (from: {participant_id}): [{text}]")
|
||||
else:
|
||||
frame = InterimTranscriptionFrame(text, participant_id, timestamp)
|
||||
frame = InterimTranscriptionFrame(text, participant_id, timestamp, language)
|
||||
|
||||
if self._input:
|
||||
await self._input.push_transcription_frame(frame)
|
||||
|
||||
@@ -9,3 +9,20 @@ import datetime
|
||||
|
||||
def time_now_iso8601() -> str:
|
||||
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds")
|
||||
|
||||
|
||||
def seconds_to_nanoseconds(seconds: float) -> int:
|
||||
return int(seconds * 1_000_000_000)
|
||||
|
||||
|
||||
def nanoseconds_to_seconds(nanoseconds: int) -> float:
|
||||
return nanoseconds / 1_000_000_000
|
||||
|
||||
|
||||
def nanoseconds_to_str(nanoseconds: int) -> str:
|
||||
total_seconds = nanoseconds_to_seconds(nanoseconds)
|
||||
hours = int(total_seconds // 3600)
|
||||
minutes = int((total_seconds % 3600) // 60)
|
||||
seconds = int(total_seconds % 60)
|
||||
microseconds = int((total_seconds - int(total_seconds)) * 1_000_000)
|
||||
return f"{hours}:{minutes:02}:{seconds:02}.{microseconds:06}"
|
||||
|
||||
@@ -3,32 +3,39 @@
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
from threading import Lock
|
||||
|
||||
_COUNTS = {}
|
||||
_COUNTS_MUTEX = Lock()
|
||||
|
||||
_ID = 0
|
||||
_ID_MUTEX = Lock()
|
||||
_COUNTS = collections.defaultdict(itertools.count)
|
||||
_ID = itertools.count()
|
||||
|
||||
|
||||
def obj_id() -> int:
|
||||
global _ID, _ID_MUTEX
|
||||
with _ID_MUTEX:
|
||||
_ID += 1
|
||||
return _ID
|
||||
"""
|
||||
Generate a unique id for an object.
|
||||
|
||||
>>> obj_id()
|
||||
0
|
||||
>>> obj_id()
|
||||
1
|
||||
>>> obj_id()
|
||||
2
|
||||
"""
|
||||
return next(_ID)
|
||||
|
||||
|
||||
def obj_count(obj) -> int:
|
||||
global _COUNTS, COUNTS_MUTEX
|
||||
name = obj.__class__.__name__
|
||||
with _COUNTS_MUTEX:
|
||||
if name not in _COUNTS:
|
||||
_COUNTS[name] = 0
|
||||
else:
|
||||
_COUNTS[name] += 1
|
||||
return _COUNTS[name]
|
||||
"""Generate a unique id for an object.
|
||||
|
||||
>>> obj_count(object())
|
||||
0
|
||||
>>> obj_count(object())
|
||||
1
|
||||
>>> new_type = type('NewType', (object,), {})
|
||||
>>> obj_count(new_type())
|
||||
0
|
||||
"""
|
||||
return next(_COUNTS[obj.__class__.__name__])
|
||||
|
||||
|
||||
def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
|
||||
|
||||
Reference in New Issue
Block a user