Compare commits

...

67 Commits

Author SHA1 Message Date
Filipi Fuchter
37d15588b8 Starting the base output transport only after starting the small webrtc transport. 2025-05-05 17:12:31 -03:00
Filipi Fuchter
d2bf8321a0 Mentioning the fix in the changelog. 2025-05-05 07:03:08 -03:00
Filipi Fuchter
781df7ac77 Merge branch 'main' into fixing_sound_mixer 2025-05-05 06:58:02 -03:00
Filipi Fuchter
1d69a92ea1 Adding real-time throttling to the audio mixer. 2025-05-05 06:55:34 -03:00
Aleix Conchillo Flaqué
75d261639f Merge pull request #1726 from pipecat-ai/aleix/pipecat-0.0.66
update CHANGELOG for pipecat 0.0.66
2025-05-02 20:54:57 -07:00
Aleix Conchillo Flaqué
f720d795d0 update CHANGELOG for pipecat 0.0.66 2025-05-02 20:29:51 -07:00
Aleix Conchillo Flaqué
f6fe83e358 Merge pull request #1725 from pipecat-ai/aleix/update-daily-python-0.18.1
update to daily-python 0.18.1
2025-05-02 20:27:50 -07:00
Aleix Conchillo Flaqué
54971a0735 update to daily-python 0.18.1 2025-05-02 17:47:44 -07:00
Mark Backman
4513e81e13 Merge pull request #1723 from pipecat-ai/mb/base-output-bot-speaking-log
Only display the destination in the bot started/stopped speaking log …
2025-05-02 17:32:47 -04:00
Mark Backman
872204b795 Only display the destination in the bot started/stopped speaking log when there is a desintation 2025-05-02 17:29:28 -04:00
Aleix Conchillo Flaqué
a94cbfe6f5 Merge pull request #1722 from pipecat-ai/aleix/base-output-transport-audio-task-fix
BaseOutputTransport: always initialize audio task
2025-05-02 14:26:30 -07:00
Aleix Conchillo Flaqué
7152faafb2 BaseOutputTransport: always initialize audio task
We also use the audio task to also send synchronized images with audio.
2025-05-02 14:23:15 -07:00
Mark Backman
e6aadaccd8 Merge pull request #1721 from pipecat-ai/mb/simli-silent-frames
Fix: SimliVideoService was continuously emitting audio, preventing Bo…
2025-05-02 16:44:39 -04:00
Mark Backman
3a73aa71b8 Merge pull request #1613 from pipecat-ai/mb/improve-storybot-readme
demo: Restructure storytelling-chatbot directory, update README steps…
2025-05-02 16:39:59 -04:00
Mark Backman
814e7509e1 demo: Restructure storytelling-chatbot directory, update README steps, link to vercel demo 2025-05-02 16:37:37 -04:00
Vanessa Pyne
e0cf5ec016 Merge pull request #1705 from pipecat-ai/vp-update-nvidia-models
Riva Service: add magpie-tts-multilingual model
2025-05-02 15:34:23 -05:00
vipyne
667bd32e6a Riva: remove deprecated lines in example 2025-05-02 15:33:10 -05:00
vipyne
b2ecd83706 update CHANGELOG with Riva details 2025-05-02 15:33:10 -05:00
vipyne
b2754117c8 Riva: refactor function_id and model_name 2025-05-02 15:33:10 -05:00
vipyne
6c428c303b update magpie voice 2025-05-02 15:33:10 -05:00
Mark Backman
e7d889a143 Update RivaSTTService to use by default 2025-05-02 15:33:10 -05:00
Mark Backman
da60e7069b Update pyproject.toml to use nvidia-riva-client 2.19.1 2025-05-02 15:33:10 -05:00
Mark Backman
c14406a3b9 Demos use the latest services 2025-05-02 15:33:10 -05:00
Mark Backman
725ab5ec21 Small fixes: No default api_key of None, ParakeetSTTService uses RivaSTTService.InputParams 2025-05-02 15:33:10 -05:00
Mark Backman
daf9d47e58 Update RivaSegmentedSTTService 2025-05-02 15:33:10 -05:00
vipyne
63a65627a2 Riva Service: add magpie-tts-multilingual model 2025-05-02 15:33:10 -05:00
Mark Backman
02c07755b0 Add Changelog entry for PR 1707 2025-05-02 15:33:10 -05:00
Matt Kim
15cbd18acc [Rime] Add phonemizeBetweenBrackets and pauseBetweenBrackets to RimeTTSService (ws)
There is a fix incoming in
2025-05-02 15:33:10 -05:00
Kwindla Hultman Kramer
93c40b87dc small groq updates 2025-05-02 15:33:10 -05:00
Mark Backman
eeaa9f67a1 Fix: SimliVideoService was continuously emitting audio, preventing BotStoppedSpeakingFrame from being sent 2025-05-02 16:32:42 -04:00
Mark Backman
b60691c7b2 Merge pull request #1720 from pipecat-ai/mb/changelog-pr-1707
Add Changelog entry for PR 1707
2025-05-02 16:13:40 -04:00
Mark Backman
2bb1b0b343 Add Changelog entry for PR 1707 2025-05-02 16:09:50 -04:00
Mark Backman
047ef9f86c Merge pull request #1707 from rimelabs/matt/rime/url_param_serialization
[Rime] Add new params to RimeTTSService
2025-05-02 16:08:01 -04:00
Kwindla Hultman Kramer
9a2c603c91 Merge pull request #1711 from pipecat-ai/khk/groq-updates 2025-05-02 12:21:15 -07:00
Filipi da Silva Fuchter
94c4169407 Merge pull request #1717 from pipecat-ai/local_smart_turn_torch
Local smart turn torch
2025-05-02 15:53:30 -03:00
Filipi Fuchter
cb8a551db8 Mentioning the new LocalSmartTurnAnalyzer in the changelog. 2025-05-02 14:32:18 -03:00
Filipi Fuchter
779f09af70 Fixing lint. 2025-05-02 14:22:38 -03:00
Filipi Fuchter
19dc0f2bfb New example using the local smart turn 2025-05-02 14:21:42 -03:00
Filipi Fuchter
f0709e22ba Creating a local smart turn using torch. 2025-05-02 14:21:29 -03:00
Mark Backman
8250736f5e Merge pull request #1708 from pipecat-ai/mb/gemini-user-context
Push GeminiMultimodalLiveLLMService TranscriptionFrame Upstream, remo…
2025-05-02 13:10:27 -04:00
Mark Backman
83348a9f93 Merge pull request #1714 from pipecat-ai/mb/fix-gemini-text-modality
Restore TEXT modalities support to GeminiMultimodalLiveLLMService
2025-05-02 10:41:05 -04:00
Mark Backman
96d40903a9 Only send TTSStoppedFrame from Gemini when in AUDIO mode, only send one LLMFullResponseEndFrame 2025-05-02 10:18:53 -04:00
Aleix Conchillo Flaqué
2560811805 Merge pull request #1697 from pipecat-ai/aleix/daily-custom-audio-tracks
add support for multiple transport destinations
2025-05-02 06:34:09 -07:00
Mark Backman
2b8c44c008 Merge pull request #1710 from pipecat-ai/mb/openai-context-aggregation
fix: OpenAIRealtimeBetaLLMService writes two assistant messages to th…
2025-05-02 07:43:35 -04:00
Mark Backman
38e2d37674 Restore TEXT modalities support to GeminiMultimodalLiveLLMService 2025-05-02 07:36:12 -04:00
Vanessa Pyne
6278561f88 Merge pull request #1709 from pipecat-ai/vp-fix-fastpitch-params-update
Riva TTS: update FastPitch params
2025-05-01 21:23:10 -05:00
Aleix Conchillo Flaqué
750e79c1ce DailyParams: rename to camera/microphone_out_enabled 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
71eb2963c5 examples: added daily-custom-tracks 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
f44e2c86ea BaseOutputTransport: compute sample_rate and audio_chunk_size in main class 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
afe1f0df8c DailyTransport: make sure we can write audio frames to destination 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
458fddfb48 update CHANGELOG with new Daily and Transport features 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
8d915c5ccb DailyParams: allow enabling/disabling camera/microphone tracks 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
304153dd03 TTSService: set transport destination to all TTS frames 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
a6781b7352 rename destination to transport_destination 2025-05-01 19:17:14 -07:00
Aleix Conchillo Flaqué
5ad0058303 update CHANGELOG with frame source/destination support 2025-05-01 19:11:13 -07:00
Aleix Conchillo Flaqué
75c039de33 examples: add daily-multi-translation 2025-05-01 19:11:13 -07:00
Aleix Conchillo Flaqué
74e3c3677e DailyTransport: fix audio/video renderers registration 2025-05-01 18:58:44 -07:00
Aleix Conchillo Flaqué
dc20327f10 DailyTransport: register audio destination and use custom tracks 2025-05-01 18:58:44 -07:00
Aleix Conchillo Flaqué
e738affd29 BaseOutputTransport: allow sending audio/video to multiple destinations 2025-05-01 18:58:44 -07:00
Aleix Conchillo Flaqué
ef3d732607 DailyTransport: allow capturing multiple simultaneous audio/video sources 2025-05-01 18:58:44 -07:00
Aleix Conchillo Flaqué
6d63cff1bf DailyTransport: custom audio tracks support 2025-05-01 18:58:44 -07:00
Aleix Conchillo Flaqué
12f42605a1 pyproject: update daily-python to 0.18.0 2025-05-01 18:58:44 -07:00
Kwindla Hultman Kramer
fac3337927 small groq updates 2025-05-01 17:09:15 -07:00
Mark Backman
76d198151c Push GeminiMultimodalLiveLLMService TranscriptionFrame Upstream, remove direct context addition 2025-05-01 15:41:04 -04:00
Mark Backman
6a907058de fix: OpenAIRealtimeBetaLLMService writes two assistant messages to the context 2025-05-01 15:37:39 -04:00
vipyne
6e1f531f64 Riva TTS: update FastPitch params
91138c3f66 (diff-ece228577b1d233ce600a948243f90cece53e3a9b89554a0b27a48bc4d6e0fdfR45)
2025-05-01 11:14:41 -05:00
Matt Kim
4232cca5b6 [Rime] Add phonemizeBetweenBrackets and pauseBetweenBrackets to RimeTTSService (ws)
There is a fix incoming in
2025-04-30 18:09:22 -07:00
100 changed files with 2612 additions and 481 deletions

View File

@@ -7,8 +7,55 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- Added real-time throttling to the `BaseOutputTransport` when using audio mixer
to ensure audio frames (including silence) are emitted at the correct intervals.
This prevents unnecessary CPU usage and avoids flooding the output with silent
frames when no new audio is available.
## [0.0.66] - 2025-05-02
### Added
- Added two new input parameters to `RimeTTSService`: `pause_between_brackets`
and `phonemize_between_brackets`.
- Added support for cross-platform local smart turn detection. You can use
`LocalSmartTurnAnalyzer` for on-device inference using Torch.
- `BaseOutputTransport` now allows multiple destinations if the transport
implementation supports it (e.g. Daily's custom tracks). With multiple
destinations it is possible to send different audio or video tracks with a
single transport simultaneously. To do that, you need to set the new
`Frame.transport_destination` field with your desired transport destination
(e.g. custom track name), tell the transport you want a new destination with
`TransportParams.audio_out_destinations` or
`TransportParams.video_out_destinations` and the transport should take care of
the rest.
- Similar to the new `Frame.transport_destination`, there's a new
`Frame.transport_source` field which is set by the `BaseInputTransport` if the
incoming data comes from a non-default source (e.g. custom tracks).
- `TTSService` has a new `transport_destination` constructor parameter. This
parameter will be used to update the `Frame.transport_destination` field for
each generated `TTSAudioRawFrame`. This allows sending multiple bots' audio to
multiple destinations in the same pipeline.
- Added `DailyTransportParams.camera_out_enabled` and
`DailyTransportParams.microphone_out_enabled` which allows you to
enable/disable the main output camera or microphone tracks. This is useful if
you only want to use custom tracks and not send the main tracks. Note that you
still need `audio_out_enabled=True` or `video_out_enabled`.
- Added `DailyTransport.capture_participant_audio()` which allows you to capture
an audio source (e.g. "microphone", "screenAudio" or a custom track name) from
a remote participant.
- Added `DailyTransport.update_publishing()` which allows you to update the call
video and audio publishing settings (e.g. audio and video quality).
- Added `RTVIObserverParams` which allows you to configure what RTVI messages
are sent to the clients.
@@ -37,6 +84,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `TransportParams.audio_mixer` now supports a string and also a dictionary to
provide a mixer per destination. For example:
```python
audio_out_mixer={
"track-1": SoundfileMixer(...),
"track-2": SoundfileMixer(...),
"track-N": SoundfileMixer(...),
},
```
- The `STTMuteFilter` now mutes `InterimTranscriptionFrame` and
`TranscriptionFrame` which allows the `STTMuteFilter` to be used in
conjunction with transports that generate transcripts, e.g. `DailyTransport`.
@@ -70,6 +128,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
case there's no need to push audio to the rest of the pipeline, but this is
not a very common case.
- Added `RivaSegmentedSTTService`, which allows Riva offline/batch models, such
as to be "canary-1b-asr" used in Pipecat.
### Deprecated
- Function calls with parameters
@@ -85,8 +146,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `TransportParams.vad_audio_passthrough` parameter is now deprecated, use
`TransportParams.audio_in_passthrough` instead.
- `ParakeetSTTService` is now deprecated, use `RivaSTTService` instead, which uses
the model "parakeet-ctc-1.1b-asr" by default.
- `FastPitchTTSService` is now deprecated, use `RivaTTSService` instead, which uses
the model "magpie-tts-multilingual" by default.
### Fixed
- Fixed an issue with `SimliVideoService` where the bot was continuously outputting
audio, which prevents the `BotStoppedSpeakingFrame` from being emitted.
- Fixed an issue where `OpenAIRealtimeBetaLLMService` would add two assistant
messages to the context.
- Fixed an issue with `GeminiMultimodalLiveLLMService` where the context
contained tokens instead of words.
@@ -102,6 +175,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Other
- Added `examples/daily-custom-tracks` to show how to send and receive Daily
custom tracks.
- Added `examples/daily-multi-translation` to showcase how to send multiple
simulataneous translations with the same transport.
- Added 04 foundational examples for client/server transports. Also, renamed
`29-livekit-audio-chat.py` to `04b-transports-livekit.py`.

View File

@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -0,0 +1,39 @@
# Daily Custom Tracks
This example shows how to send and receive Daily custom tracks. We will run a simple `daily-python` application to send an audio file with a custom track (named "pipecat") to a room. Then, the Pipecat bot will mirror that custom track into another custom track (named "pipecat-mirror") in the same room.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
## Run the bot
Start the bot by giving it a Daily room URL.
```bash
python bot.py -u ROOM_URL
```
The bot will wait for the first participant to join. Then, it will mirror a custom track named "pipecat" into a new custom track named "pipecat-mirror".
## Run the sender
Now, run the custom track sender. This is a simple `daily-python` application that opens and audio file and sends it as a custom track to the same Daily room.
```bash
python custom_track_sender.py -u ROOM_URL -i office-ambience-mono-16000.mp3
```
## Open client
Finally, open the client so you can hear both custom tracks.
```bash
open index.html
```
Once the client is opened, copy the URL of the Daily room and join it. You should be able to select which custom track you want to hear.

View File

@@ -0,0 +1,87 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import sys
import aiohttp
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.services.daily import DailyParams, DailyTransport
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class CustomTrackMirrorProcessor(FrameProcessor):
def __init__(self, transport_destination: str, **kwargs):
super().__init__(**kwargs)
self._transport_destination = transport_destination
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame) and frame.transport_source:
output_frame = OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
output_frame.transport_destination = self._transport_destination
await self.push_frame(output_frame)
else:
await self.push_frame(frame, direction)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Custom tracks mirror",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
microphone_out_enabled=False, # Disable since we just use custom tracks
audio_out_destinations=["pipecat-mirror"],
),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
CustomTrackMirrorProcessor("pipecat-mirror"),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_audio(participant["id"], audio_source="pipecat")
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,74 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import time
from daily import CallClient, CustomAudioSource, Daily
from pydub import AudioSegment
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument("-u", "--url", type=str, required=True, help="URL of the Daily room to join")
parser.add_argument(
"-i", "--input", type=str, required=True, help="Input audio file (needs 16000 sample rate)"
)
args, _ = parser.parse_known_args()
audio = AudioSegment.from_mp3(args.input)
raw_bytes = audio.raw_data
sample_rate = audio.frame_rate
channels = audio.channels
print(f"Length: {len(raw_bytes)} bytes")
print(f"Sample rate: {sample_rate}, Channels: {channels}")
# Initialize the Daily context & create call client
Daily.init()
client = CallClient()
# Join the room and indicate we have a custom track named "pipecat".
client.join(
args.url,
client_settings={
"publishing": {
"camera": False,
"microphone": False,
"customAudio": {"pipecat": True},
},
},
)
# Just sleep for a couple of seconds. To do this well we should really use
# completions.
time.sleep(2)
# Create the custom audio source. This is where we will write our audio.
audio_source = CustomAudioSource(sample_rate, channels)
# Create an audio track and assign it our audio source.
client.add_custom_audio_track("pipecat", audio_source)
# Just sleep for a second. To do this well we should really use completions.
time.sleep(1)
try:
# Just write one second of audio until we have read all the file.
chunk_size = sample_rate * channels * 2
while len(raw_bytes) > 0:
chunk = raw_bytes[:chunk_size]
raw_bytes = raw_bytes[chunk_size:]
audio_source.write_frames(chunk)
except KeyboardInterrupt:
client.leave()
# Just sleep for a second. To do this well we should really use completions.
time.sleep(1)
client.release()

View File

@@ -0,0 +1,173 @@
<html>
<head>
<title>daily custom tracks</title>
</head>
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
<link
rel="stylesheet"
type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
/>
<script>
function enableButton(buttonId, enable) {
const button = document.getElementById(buttonId);
button.disabled = !enable;
}
function enableJoinButton(enable) {
enableButton("join-button", enable);
}
function enableLeaveButton(enable) {
enableButton("leave-button", enable);
}
function destroyPlayers(query) {
const items = document.querySelectorAll(query);
if (items) {
for (const item of items) {
item.remove();
}
}
}
function destroyParticipantPlayers(participantId) {
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
destroyPlayers(`button[data-participant-id="${participantId}"]`);
}
async function startPlayer(player, track) {
player.muted = false;
player.autoplay = true;
if (track != null) {
player.srcObject = new MediaStream([track]);
}
}
async function buildAudioPlayer(track, participantId) {
const audioContainer = document.getElementById("audio-container");
const player = document.createElement("audio");
player.dataset.participantId = participantId;
// Create a new button for controlling audio
const audioControlButton = document.createElement("button");
audioControlButton.className = "ui primary green button"
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
audioControlButton.dataset.participantId = participantId;
audioControlButton.onclick = () => {
if (player.paused) {
player.play();
audioControlButton.className = "ui primary red button"
} else {
player.pause();
audioControlButton.className = "ui primary green button"
}
};
audioContainer.appendChild(player);
audioContainer.appendChild(audioControlButton);
await startPlayer(player, track);
player.pause()
return player;
}
function subscribeToTracks(participantId) {
console.log(`subscribing to track`);
if (participantId === "local") {
return;
}
callObject.updateParticipant(participantId, {
setSubscribedTracks: {
audio: true,
video: false,
custom: true,
},
});
}
function startDaily() {
enableJoinButton(true);
enableLeaveButton(false);
window.callObject = window.DailyIframe.createCallObject({});
callObject.on("participant-joined", (e) => {
if (!e.participant.local) {
console.log("participant-joined", e.participant);
subscribeToTracks(e.participant.session_id);
}
});
callObject.on("participant-left", (e) => {
console.log("participant-left", e.participant.session_id);
destroyParticipantPlayers(e.participant.session_id);
});
callObject.on("track-started", async (e) => {
console.log("track-started", e.track);
if (e.track.kind === "audio") {
await buildAudioPlayer(e.track, e.participant.session_id);
}
});
}
async function joinRoom() {
enableJoinButton(false);
enableLeaveButton(true);
const meetingUrl = document.getElementById("meeting-url").value;
callObject.join({
url: meetingUrl,
startVideoOff: true,
startAudioOff: true,
subscribeToTracksAutomatically: false,
receiveSettings: {
base: { video: { layer: 0 } },
},
});
}
async function leaveRoom() {
enableJoinButton(true);
enableLeaveButton(false);
callObject.leave();
const audioContainer = document.getElementById("audio-container");
audioContainer.replaceChildren();
}
</script>
<body onload="startDaily()">
<div class="ui centered page grid" style="margin-top: 30px">
<div class="ten wide column">
<div class="ui form" style="margin-top: 30px">
<div class="field">
<label>Meeting URL</label>
<input id="meeting-url" value="" />
</div>
</div>
</div>
</div>
<div class="ui centered aligned header" style="margin-top: 30px">
<button id="join-button" class="ui primary button" onclick="joinRoom()">
Join
</button>
<button id="leave-button" class="ui button" onclick="leaveRoom()">
Leave
</button>
</div>
<div id="tile" class="ui container" style="margin-top: 30px">
<div id="tile" class="ui center aligned grid">
<div id="audio-container"></div><br/>
</div>
</div>
</body>
</html>

View File

@@ -0,0 +1,2 @@
pydub
pipecat-ai[daily]

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)

View File

@@ -0,0 +1,15 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]

View File

@@ -0,0 +1,39 @@
# Daily Multi Translation
This example shows how to use Daily to stream multiple simultaneous translations using a single transport. Daily provides custom tracks and in this example we will simultaneously translate incoming audio in English to Spanish, French and German, each of them being sent to a custom track.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp env.example .env # and add your credentials
```
## Run the server
```bash
python server.py
```
Then, visit `http://localhost:7860/` in your browser. This will open a Daily Prebuilt room where you will speak in English (make sure you are not muted).
## Open client
Next, you need to open the client that will listen to the translations.
```bash
open index.html
```
Once the client is opened, copy the URL of the Daily room created above and join it. You should be able to select which translation you want to hear.
## Build and test the Docker image
```
docker build -t daily-multi-translation .
docker run --env-file .env -p 7860:7860 daily-multi-translation
```

View File

@@ -0,0 +1,165 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
BACKGROUND_SOUND_FILE = "office-ambience-mono-16000.mp3"
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Multi translation bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_out_mixer={
"spanish": SoundfileMixer(
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
),
"french": SoundfileMixer(
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
),
"german": SoundfileMixer(
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
),
},
audio_out_destinations=["spanish", "french", "german"],
microphone_out_enabled=False, # Disable since we just use custom tracks
vad_analyzer=SileroVADAnalyzer(),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts_spanish = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="cefcb124-080b-4655-b31f-932f3ee743de",
transport_destination="spanish",
)
tts_french = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="8832a0b5-47b2-4751-bb22-6a8e2149303d",
transport_destination="french",
)
tts_german = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="38aabb6a-f52b-4fb0-a3d1-988518f4dc06",
transport_destination="german",
)
messages_spanish = [
{
"role": "system",
"content": "You will be provided with a sentence in English, and your task is to only translate it into Spanish.",
},
]
messages_french = [
{
"role": "system",
"content": "You will be provided with a sentence in English, and your task is to only translate it into French.",
},
]
messages_german = [
{
"role": "system",
"content": "You will be provided with a sentence in English, and your task is to only translate it into German.",
},
]
llm_spanish = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm_french = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm_german = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
context_spanish = OpenAILLMContext(messages_spanish)
context_aggregator_spanish = llm_spanish.create_context_aggregator(context_spanish)
context_french = OpenAILLMContext(messages_french)
context_aggregator_french = llm_french.create_context_aggregator(context_french)
context_german = OpenAILLMContext(messages_german)
context_aggregator_german = llm_german.create_context_aggregator(context_german)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
ParallelPipeline(
# Spanish pipeline.
[
context_aggregator_spanish.user(),
llm_spanish,
tts_spanish,
context_aggregator_spanish.assistant(),
],
# French pipeline.
[
context_aggregator_french.user(),
llm_french,
tts_french,
context_aggregator_french.assistant(),
],
# German pipeline.
[
context_aggregator_german.user(),
llm_german,
tts_german,
context_aggregator_german.assistant(),
],
),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
observers=[TranscriptionLogObserver()],
)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,5 @@
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
DEEPGRAM_API_KEY=efb...
CARTESIA_API_KEY=aeb...

View File

@@ -0,0 +1,202 @@
<html>
<head>
<title>daily multi translation</title>
</head>
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
<script
src="https://code.jquery.com/jquery-3.1.1.min.js"
integrity="sha256-hVVnYaiADRTO2PzUGmuLJr8BLUSjGIZsDYGmIJLv2b8="
crossorigin="anonymous"
></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
<link
rel="stylesheet"
type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
/>
<script>
function enableButton(buttonId, enable) {
const button = document.getElementById(buttonId);
button.disabled = !enable;
}
function enableJoinButton(enable) {
enableButton("join-button", enable);
}
function enableLeaveButton(enable) {
enableButton("leave-button", enable);
}
function destroyPlayers(query) {
const items = document.querySelectorAll(query);
if (items) {
for (const item of items) {
item.remove();
}
}
}
function destroyParticipantPlayers(participantId) {
destroyPlayers(`video[data-participant-id="${participantId}"]`);
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
destroyPlayers(`button[data-participant-id="${participantId}"]`);
}
async function startPlayer(player, track) {
player.muted = false;
player.autoplay = true;
if (track != null) {
player.srcObject = new MediaStream([track]);
}
}
async function buildVideoPlayer(track, participantId) {
const videoContainer = document.getElementById("video-container");
const player = document.createElement("video");
player.dataset.participantId = participantId;
videoContainer.appendChild(player);
await startPlayer(player, track);
await player.play();
return player;
}
async function buildAudioPlayer(track, participantId) {
const audioContainer = document.getElementById("audio-container");
const player = document.createElement("audio");
player.dataset.participantId = participantId;
// Create a new button for controlling audio
const audioControlButton = document.createElement("button");
audioControlButton.className = "ui primary green button"
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
audioControlButton.dataset.participantId = participantId;
audioControlButton.onclick = () => {
if (player.paused) {
player.play();
audioControlButton.className = "ui primary red button"
} else {
player.pause();
audioControlButton.className = "ui primary green button"
}
};
audioContainer.appendChild(player);
audioContainer.appendChild(audioControlButton);
await startPlayer(player, track);
player.pause()
return player;
}
function subscribeToTracks(participantId) {
console.log(`subscribing to track`);
if (participantId === "local") {
return;
}
callObject.updateParticipant(participantId, {
setSubscribedTracks: {
audio: true,
video: true,
custom: true,
},
});
}
function startDaily() {
enableJoinButton(true);
enableLeaveButton(false);
window.callObject = window.DailyIframe.createCallObject({});
callObject.on("participant-joined", (e) => {
if (!e.participant.local) {
console.log("participant-joined", e.participant);
subscribeToTracks(e.participant.session_id);
}
});
callObject.on("participant-left", (e) => {
console.log("participant-left", e.participant.session_id);
destroyParticipantPlayers(e.participant.session_id);
});
callObject.on("track-started", async (e) => {
console.log("track-started", e.track);
if (e.track.kind === "video") {
await buildVideoPlayer(e.track, e.participant.session_id);
} else if (e.track.kind === "audio") {
await buildAudioPlayer(e.track, e.participant.session_id);
}
});
}
async function joinRoom() {
enableJoinButton(false);
enableLeaveButton(true);
const meetingUrl = document.getElementById("meeting-url").value;
callObject.join({
url: meetingUrl,
startVideoOff: true,
startAudioOff: true,
subscribeToTracksAutomatically: false,
receiveSettings: {
base: { video: { layer: 0 } },
},
});
}
async function leaveRoom() {
enableJoinButton(true);
enableLeaveButton(false);
callObject.leave();
const videoContainer = document.getElementById("video-container");
videoContainer.replaceChildren();
const audioContainer = document.getElementById("audio-container");
audioContainer.replaceChildren();
}
</script>
<body onload="startDaily()">
<div class="ui centered page grid" style="margin-top: 30px">
<div class="ten wide column">
<div class="ui form" style="margin-top: 30px">
<div class="field">
<label>Meeting URL</label>
<input id="meeting-url" value="" />
</div>
</div>
</div>
</div>
<div class="ui centered aligned header" style="margin-top: 30px">
<button id="join-button" class="ui primary button" onclick="joinRoom()">
Join
</button>
<button id="leave-button" class="ui button" onclick="leaveRoom()">
Leave
</button>
</div>
<div id="tile" class="ui container" style="margin-top: 30px">
<div id="tile" class="ui center aligned grid">
<div id="audio-container"></div><br/>
</div>
</div>
<div id="tile" class="ui container" style="margin-top: 30px">
<div id="tile" class="ui center aligned grid">
<div id="video-container" class="ui segment"></div>
</div>
</div>
</body>
</html>

View File

@@ -0,0 +1,5 @@
aiofiles
python-dotenv
fastapi[all]
uvicorn
pipecat-ai[daily,deepgram,openai,silero,cartesia]

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)

View File

@@ -0,0 +1,139 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import subprocess
from contextlib import asynccontextmanager
import aiohttp
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
load_dotenv(override=True)
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
yield
await aiohttp_session.close()
cleanup()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
# Ensure the room property is present
if not room.url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
)
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
if not token:
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
)
bot_procs[proc.pid] = (proc, room.url)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
if __name__ == "__main__":
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str, default=default_host, help="Host address")
parser.add_argument("--port", type=int, default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload,
)

View File

@@ -14,6 +14,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.groq.llm import GroqLLMService
from pipecat.services.groq.stt import GroqSTTService
@@ -39,7 +40,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
stt = GroqSTTService(api_key=os.getenv("GROQ_API_KEY"))
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
llm = GroqLLMService(
api_key=os.getenv("GROQ_API_KEY"), model="meta-llama/llama-4-maverick-17b-128e-instruct"
)
tts = GroqTTSService(api_key=os.getenv("GROQ_API_KEY"))
@@ -51,7 +54,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = llm.create_context_aggregator(
context, user_params=LLMUserAggregatorParams(aggregation_timeout=0.05)
)
pipeline = Pipeline(
[

View File

@@ -16,8 +16,12 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.nim.llm import NimLLMService
from pipecat.services.riva.stt import ParakeetSTTService
from pipecat.services.riva.tts import FastPitchTTSService
from pipecat.services.riva.stt import (
ParakeetSTTService,
RivaSegmentedSTTService,
RivaSTTService,
)
from pipecat.services.riva.tts import FastPitchTTSService, RivaTTSService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
@@ -37,11 +41,11 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
),
)
stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY"))
stt = RivaSTTService(api_key=os.getenv("NVIDIA_API_KEY"))
llm = NimLLMService(api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct")
tts = FastPitchTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
tts = RivaTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
messages = [
{

View File

@@ -17,6 +17,7 @@ from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.groq.llm import GroqLLMService
@@ -53,7 +54,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
llm = GroqLLMService(
api_key=os.getenv("GROQ_API_KEY"), model="meta-llama/llama-4-maverick-17b-128e-instruct"
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
@@ -83,7 +86,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
context_aggregator = llm.create_context_aggregator(
context, user_params=LLMUserAggregatorParams(aggregation_timeout=0.05)
)
pipeline = Pipeline(
[

View File

@@ -12,10 +12,12 @@ from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TranscriptionMessage
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
@@ -69,12 +71,16 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
)
context_aggregator = llm.create_context_aggregator(context)
transcript = TranscriptProcessor()
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
transcript.user(),
llm,
transport.output(),
transcript.assistant(),
context_aggregator.assistant(),
]
)
@@ -103,6 +109,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
logger.info(f"Client closed connection")
await task.cancel()
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
for msg in frame.messages:
if isinstance(msg, TranscriptionMessage):
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -36,6 +36,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_is_live=True,
video_out_width=512,
video_out_height=512,
vad_analyzer=SileroVADAnalyzer(),

View File

@@ -0,0 +1,128 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn import LocalSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the path where the smart-turn repo is cloned.
#
# Example setup:
#
# # Git LFS (Large File Storage)
# brew install git-lfs
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
# or add it to your .env file
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
return (url, token)

View File

@@ -1,2 +0,0 @@
frontend/node_modules
frontend/out

View File

@@ -1,4 +1,4 @@
[![Try](https://img.shields.io/badge/try_it-here-blue)](https://storytelling-chatbot.fly.dev)
[![Try](https://img.shields.io/badge/try_it-here-blue)](https://gemini-storybot.vercel.app/)
# Storytelling Chatbot
@@ -9,7 +9,6 @@ It periodically prompts the user for input for a 'choose your own adventure' sty
We use Gemini 2.0 for creating the story and image prompts, and we add visual elements to the story by generating images using Google's Imagen.
---
### It uses the following AI services:
@@ -20,7 +19,7 @@ Transcribes inbound participant voice media to text.
**Google Gemini 2.0 - LLM**
Our creative writer LLM. You can see the context used to prompt it [here](src/prompts.py)
Our creative writer LLM. You can see the context used to prompt it [here](server/prompts.py)
**ElevenLabs - Text-to-Speech**
@@ -34,47 +33,76 @@ Adds pictures to our story. Prompting is quite key for style consistency, so we
## Setup
**Install requirements**
### Client
```shell
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
1. Navigate to the client directory:
**Create environment file and set variables:**
```shell
cd client
```
```shell
mv env.example .env
```
2. Install dependencies:
When deploying to production, to ensure only this app can spawn a new bot, set your `ENV` to `production`
```shell
npm install
```
**Build the frontend:**
3. Build the client:
This project uses a custom frontend, which needs to built. Note: this is done automatically as part of the Docker deployment.
```shell
npm run build
```
```shell
cd frontend/
npm install
npm run build
```
### Server
The build UI files can be found in `frontend/out`
1. Navigate to the server directory
## Running it locally
```shell
cd ../server
```
Start the API / bot manager:
2. Set up your virtual environment and install requirements
`python src/bot_runner.py --host localhost`
```shell
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
If you'd like to run a custom domain or port:
3. Create environment file and set variables
`python src/bot_runner.py --host somehost --p someport`
```shell
mv env.example .env
```
➡️ Open the host URL in your browser `http://localhost:7860`
You'll need API keys for:
If you've run previous versions of the demo, make sure to set `ENV=dev`, and remove the `RUN_AS_VM` line from the .env file.
- DAILY_API_KEY
- ELEVENLABS_API_KEY
- ELEVENLABS_VOICE_ID
- GOOGLE_API_KEY
4. (Optional) Deployment:
When deploying to production, to ensure only this app can spawn new bot processes, set your `ENV` to `production`
## Run it locally
1. Navigate back to the demo's root directory:
```shell
cd ..
```
2. Run the application:
```shell
python server/bot_runner.py --host localhost
```
You can run with a custom domain or port using: `python server/bot_runner.py --host somehost --p someport`
3. ➡️ Open the host URL in your browser: http://localhost:7860
---

View File

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

Before

Width:  |  Height:  |  Size: 1.3 MiB

After

Width:  |  Height:  |  Size: 1.3 MiB

View File

Before

Width:  |  Height:  |  Size: 2.4 MiB

After

Width:  |  Height:  |  Size: 2.4 MiB

View File

@@ -1,11 +1,11 @@
{
"name": "frontend",
"name": "client",
"version": "0.1.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "frontend",
"name": "client",
"version": "0.1.0",
"dependencies": {
"@daily-co/daily-js": "^0.62.0",

View File

@@ -1,5 +1,5 @@
{
"name": "frontend",
"name": "client",
"version": "0.1.0",
"private": true,
"scripts": {

View File

Before

Width:  |  Height:  |  Size: 3.7 KiB

After

Width:  |  Height:  |  Size: 3.7 KiB

View File

Before

Width:  |  Height:  |  Size: 788 KiB

After

Width:  |  Height:  |  Size: 788 KiB

View File

@@ -0,0 +1,2 @@
client/node_modules
client/out

View File

@@ -44,11 +44,11 @@ COPY ./requirements.txt requirements.txt
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
# Copy everything else
COPY --chown=user ./src/ src/
COPY --chown=user ./server/ server/
# Copy frontend app and build
COPY --chown=user ./frontend/ frontend/
RUN cd frontend && npm install && npm run build
# Copy client app and build
COPY --chown=user ./client/ client/
RUN cd client && npm install && npm run build
# Start the FastAPI server
CMD python3 src/bot_runner.py --port ${FAST_API_PORT}
CMD python3 server/bot_runner.py --port ${FAST_API_PORT}

View File

Before

Width:  |  Height:  |  Size: 1.4 MiB

After

Width:  |  Height:  |  Size: 1.4 MiB

View File

Before

Width:  |  Height:  |  Size: 1.5 MiB

After

Width:  |  Height:  |  Size: 1.5 MiB

View File

@@ -57,7 +57,7 @@ app.add_middleware(
)
# Mount the static directory
STATIC_DIR = "frontend/out"
STATIC_DIR = "client/out"
# ------------ Fast API Routes ------------ #
@@ -175,7 +175,7 @@ async def virtualize_bot(room_url: str, token: str):
image = data[0]["config"]["image"]
# Machine configuration
cmd = f"python src/bot.py -u {room_url} -t {token}"
cmd = f"python server/bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {

View File

@@ -47,7 +47,7 @@ canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.17.0" ]
daily = [ "daily-python~=0.18.1" ]
deepgram = [ "deepgram-sdk~=3.8.0" ]
elevenlabs = [ "websockets~=13.1" ]
fal = [ "fal-client~=0.5.9" ]
@@ -56,7 +56,7 @@ fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.1", "google-cloud-texttospeech~=2.25.1", "google-genai~=1.7.0", "google-generativeai~=0.8.4", "websockets~=13.1" ]
grok = []
groq = [ "groq~=0.20.0" ]
groq = [ "groq~=0.23.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
koala = [ "pvkoala~=2.0.3" ]
@@ -78,7 +78,7 @@ perplexity = []
playht = [ "pyht~=0.1.12", "websockets~=13.1" ]
qwen = []
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.19.0" ]
riva = [ "nvidia-riva-client~=2.19.1" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
remote-smart-turn = []

View File

@@ -0,0 +1,73 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict
import numpy as np
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
try:
import torch
from transformers import AutoFeatureExtractor, Wav2Vec2BertForSequenceClassification
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
class LocalSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, *, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
# Define the path to the pretrained model on Hugging Face
smart_turn_model_path = "pipecat-ai/smart-turn"
logger.debug("Loading Local Smart Turn model...")
# Load the pretrained model for sequence classification
self._turn_model = Wav2Vec2BertForSequenceClassification.from_pretrained(
smart_turn_model_path
)
# Load the corresponding feature extractor for preprocessing audio
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
# Set device to GPU if available, else CPU
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Move model to selected device and set it to evaluation mode
self._turn_model = self._turn_model.to(self._device)
self._turn_model.eval()
logger.debug("Loaded Local Smart Turn")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=800, # Maximum length as specified in training
return_attention_mask=True,
return_tensors="pt",
)
# Move input tensors to the same device as the model
inputs = {k: v.to(self._device) for k, v in inputs.items()}
# Disable gradient calculation for inference
with torch.no_grad():
outputs = self._turn_model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
prediction = 1 if completion_prob > 0.5 else 0
return {
"prediction": prediction,
"probability": completion_prob,
}

View File

@@ -60,12 +60,16 @@ class Frame:
name: str = field(init=False)
pts: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None
def __str__(self):
return self.name
@@ -136,8 +140,9 @@ class ImageRawFrame:
@dataclass
class OutputAudioRawFrame(DataFrame, AudioRawFrame):
"""A chunk of audio. Will be played by the output transport if the
transport's microphone has been enabled.
"""A chunk of audio. Will be played by the output transport. If the
transport supports multiple audio destinations (e.g. multiple audio tracks) the
destination name can be specified.
"""
@@ -147,13 +152,14 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame):
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class OutputImageRawFrame(DataFrame, ImageRawFrame):
"""An image that will be shown by the transport if the transport's camera is
enabled.
"""An image that will be shown by the transport. If the transport supports
multiple video destinations (e.g. multiple video tracks) the destination
name can be specified.
"""
@@ -176,7 +182,7 @@ class URLImageRawFrame(OutputImageRawFrame):
"""
url: Optional[str]
url: Optional[str] = None
def __str__(self):
pts = format_pts(self.pts)
@@ -716,7 +722,11 @@ class UserImageRequestFrame(SystemFrame):
@dataclass
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
"""A chunk of audio usually coming from an input transport."""
"""A chunk of audio usually coming from an input transport. If the transport
supports multiple audio sources (e.g. multiple audio tracks) the source name
will be specified.
"""
def __post_init__(self):
super().__post_init__()
@@ -724,35 +734,50 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class InputImageRawFrame(SystemFrame, ImageRawFrame):
"""An image usually coming from an input transport."""
"""An image usually coming from an input transport. If the transport
supports multiple video sources (e.g. multiple video tracks) the source name
will be specified.
"""
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
@dataclass
class UserAudioRawFrame(InputAudioRawFrame):
"""A chunk of audio, usually coming from an input transport, associated to a user."""
user_id: str = ""
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class UserImageRawFrame(InputImageRawFrame):
"""An image associated to a user."""
user_id: str
user_id: str = ""
request: Optional[UserImageRequestFrame] = None
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format}, request: {self.request})"
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it."""
text: Optional[str]
text: Optional[str] = None
def __str__(self):
pts = format_pts(self.pts)

View File

@@ -227,10 +227,8 @@ class GeminiMultimodalLiveAssistantContextAggregator(OpenAIAssistantContextAggre
# but the GeminiMultimodalLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
# are process. This ensures that the context gets only one set of messages.
# GeminiMultimodalLiveLLMService also pushes TranscriptionFrames, so we need to
# ignore pushing those as well, as they're also TextFrames.
async def process_frame(self, frame: Frame, direction: FrameDirection):
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame)):
if not isinstance(frame, LLMTextFrame):
await super().process_frame(frame, direction)
async def handle_user_image_frame(self, frame: UserImageRawFrame):
@@ -354,6 +352,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
self._bot_is_speaking = False
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
self._bot_text_buffer = ""
self._sample_rate = 24000
@@ -464,9 +463,9 @@ class GeminiMultimodalLiveLLMService(LLMService):
# Sometimes the transcription contains newlines; we want to remove them.
cleaned_text = text.rstrip("\n")
logger.debug(f"[Transcription:user] {cleaned_text}")
context.add_message({"role": "user", "content": [{"type": "text", "text": cleaned_text}]})
await self.push_frame(
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601())
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601()),
FrameDirection.UPSTREAM,
)
async def _transcribe_audio(self, audio, context):
@@ -852,6 +851,15 @@ class GeminiMultimodalLiveLLMService(LLMService):
if not part:
return
# part.text is added when `modalities` is set to TEXT; otherwise, it's None
text = part.text
if text:
if not self._bot_text_buffer:
await self.push_frame(LLMFullResponseStartFrame())
self._bot_text_buffer += text
await self.push_frame(LLMTextFrame(text=text))
inline_data = part.inlineData
if not inline_data:
return
@@ -892,13 +900,24 @@ class GeminiMultimodalLiveLLMService(LLMService):
async def _handle_evt_turn_complete(self, evt):
self._bot_is_speaking = False
await self.push_frame(TTSStoppedFrame())
text = self._bot_text_buffer
self._bot_text_buffer = ""
# Only push the TTSStoppedFrame the bot is outputting audio
# when text is found, modalities is set to TEXT and no audio
# is produced.
if not text:
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame())
async def _handle_evt_output_transcription(self, evt):
if not evt.serverContent.outputTranscription:
return
# This is the output transcription text when modalities is set to AUDIO.
# In this case, we push LLMTextFrame and TTSTextFrame to be handled by the
# downstream assistant context aggregator.
text = evt.serverContent.outputTranscription.text
if not text:

View File

@@ -12,9 +12,11 @@ from loguru import logger
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
InterimTranscriptionFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMTextFrame,
TranscriptionFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
@@ -137,15 +139,6 @@ class OpenAIRealtimeLLMContext(OpenAILLMContext):
}
self.add_message(message)
def add_assistant_content_item_as_message(self, item):
message = {"role": "assistant", "content": []}
for content in item.content:
if content.type == "audio":
message["content"].append({"type": "text", "text": content.transcript})
else:
logger.error(f"Unhandled content type in assistant item: {content.type} - {item}")
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(
@@ -175,8 +168,10 @@ class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator)
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
# are process. This ensures that the context gets only one set of messages.
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
# so we need to ignore pushing those as well, as they're also TextFrames.
async def process_frame(self, frame: Frame, direction: FrameDirection):
if not isinstance(frame, LLMTextFrame):
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):

View File

@@ -562,13 +562,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
async def _handle_assistant_output(self, output):
# logger.debug(f"!!! HANDLE Assistant output: {output}")
# We haven't seen intermixed audio and function_call items in the same response. But let's
# try to write logic that handles that, if it does happen.
messages = [item for item in output if item.type == "message"]
# Also, the assistant output is pushed as LLMTextFrame and TTSTextFrame to be handled by
# the assistant context aggregator.
function_calls = [item for item in output if item.type == "function_call"]
for item in messages:
self._context.add_assistant_content_item_as_message(item)
await self._handle_function_call_items(function_calls)
async def _handle_function_call_items(self, items):

View File

@@ -68,6 +68,8 @@ class RimeTTSService(AudioContextWordTTSService):
language: Optional[Language] = Language.EN
speed_alpha: Optional[float] = 1.0
reduce_latency: Optional[bool] = False
pause_between_brackets: Optional[bool] = False
phonemize_between_brackets: Optional[bool] = False
def __init__(
self,
@@ -117,6 +119,8 @@ class RimeTTSService(AudioContextWordTTSService):
else "eng",
"speedAlpha": params.speed_alpha,
"reduceLatency": params.reduce_latency,
"pauseBetweenBrackets": json.dumps(params.pause_between_brackets),
"phonemizeBetweenBrackets": json.dumps(params.phonemize_between_brackets),
}
# State tracking

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, List, Mapping, Optional
from loguru import logger
from pydantic import BaseModel
@@ -13,12 +13,13 @@ from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.stt_service import STTService
from pipecat.services.stt_service import SegmentedSTTService, STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
@@ -31,7 +32,59 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class ParakeetSTTService(STTService):
def language_to_riva_language(language: Language) -> Optional[str]:
"""Maps Language enum to Riva ASR language codes.
Source:
https://docs.nvidia.com/deeplearning/riva/user-guide/docs/asr/asr-riva-build-table.html?highlight=fr%20fr
Args:
language: Language enum value.
Returns:
Optional[str]: Riva language code or None if not supported.
"""
language_map = {
# Arabic
Language.AR: "ar-AR",
# English
Language.EN: "en-US", # Default to US
Language.EN_US: "en-US",
Language.EN_GB: "en-GB",
# French
Language.FR: "fr-FR",
Language.FR_FR: "fr-FR",
# German
Language.DE: "de-DE",
Language.DE_DE: "de-DE",
# Hindi
Language.HI: "hi-IN",
Language.HI_IN: "hi-IN",
# Italian
Language.IT: "it-IT",
Language.IT_IT: "it-IT",
# Japanese
Language.JA: "ja-JP",
Language.JA_JP: "ja-JP",
# Korean
Language.KO: "ko-KR",
Language.KO_KR: "ko-KR",
# Portuguese
Language.PT: "pt-BR", # Default to Brazilian
Language.PT_BR: "pt-BR",
# Russian
Language.RU: "ru-RU",
Language.RU_RU: "ru-RU",
# Spanish
Language.ES: "es-ES", # Default to Spain
Language.ES_ES: "es-ES",
Language.ES_US: "es-US", # US Spanish
}
return language_map.get(language)
class RivaSTTService(STTService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN_US
@@ -40,7 +93,10 @@ class ParakeetSTTService(STTService):
*,
api_key: str,
server: str = "grpc.nvcf.nvidia.com:443",
function_id: str = "1598d209-5e27-4d3c-8079-4751568b1081",
model_function_map: Mapping[str, str] = {
"function_id": "1598d209-5e27-4d3c-8079-4751568b1081",
"model_name": "parakeet-ctc-1.1b-asr",
},
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
@@ -48,7 +104,7 @@ class ParakeetSTTService(STTService):
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._profanity_filter = False
self._automatic_punctuation = False
self._automatic_punctuation = True
self._no_verbatim_transcripts = False
self._language_code = params.language
self._boosted_lm_words = None
@@ -60,11 +116,12 @@ class ParakeetSTTService(STTService):
self._stop_history_eou = -1
self._stop_threshold_eou = -1.0
self._custom_configuration = ""
self._function_id = model_function_map.get("function_id")
self.set_model_name("parakeet-ctc-1.1b-asr")
self.set_model_name(model_function_map.get("model_name"))
metadata = [
["function-id", function_id],
["function-id", self._function_id],
["authorization", f"Bearer {api_key}"],
]
auth = riva.client.Auth(None, True, server, metadata)
@@ -79,6 +136,13 @@ class ParakeetSTTService(STTService):
def can_generate_metrics(self) -> bool:
return False
async def set_model(self, model: str):
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
logger.warning(
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
)
async def start(self, frame: StartFrame):
await super().start(frame)
@@ -196,3 +260,262 @@ class ParakeetSTTService(STTService):
def __iter__(self):
return self
class RivaSegmentedSTTService(SegmentedSTTService):
"""Speech-to-text service using NVIDIA Riva's offline/batch models.
By default, his service uses NVIDIA's Riva Canary ASR API to perform speech-to-text
transcription on audio segments. It inherits from SegmentedSTTService to handle
audio buffering and speech detection.
Args:
api_key: NVIDIA API key for authentication
server: Riva server address (defaults to NVIDIA Cloud Function endpoint)
model_function_map: Mapping of model name and its corresponding NVIDIA Cloud Function ID
sample_rate: Audio sample rate in Hz. If not provided, uses the pipeline's rate
params: Additional configuration parameters for Riva
**kwargs: Additional arguments passed to SegmentedSTTService
"""
class InputParams(BaseModel):
language: Optional[Language] = Language.EN_US
profanity_filter: bool = False
automatic_punctuation: bool = True
verbatim_transcripts: bool = False
boosted_lm_words: Optional[List[str]] = None
boosted_lm_score: float = 4.0
def __init__(
self,
*,
api_key: str,
server: str = "grpc.nvcf.nvidia.com:443",
model_function_map: Mapping[str, str] = {
"function_id": "ee8dc628-76de-4acc-8595-1836e7e857bd",
"model_name": "canary-1b-asr",
},
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
# Set model name
self.set_model_name(model_function_map.get("model_name"))
# Initialize Riva settings
self._api_key = api_key
self._server = server
self._function_id = model_function_map.get("function_id")
self._model_name = model_function_map.get("model_name")
# Store the language as a Language enum and as a string
self._language_enum = params.language or Language.EN_US
self._language = self.language_to_service_language(self._language_enum) or "en-US"
# Configure transcription parameters
self._profanity_filter = params.profanity_filter
self._automatic_punctuation = params.automatic_punctuation
self._verbatim_transcripts = params.verbatim_transcripts
self._boosted_lm_words = params.boosted_lm_words
self._boosted_lm_score = params.boosted_lm_score
# Voice activity detection thresholds (use Riva defaults)
self._start_history = -1
self._start_threshold = -1.0
self._stop_history = -1
self._stop_threshold = -1.0
self._stop_history_eou = -1
self._stop_threshold_eou = -1.0
self._custom_configuration = ""
# Create Riva client
self._config = None
self._asr_service = None
self._settings = {"language": self._language_enum}
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert pipecat Language enum to Riva's language code."""
return language_to_riva_language(language)
def _initialize_client(self):
"""Initialize the Riva ASR client with authentication metadata."""
if self._asr_service is not None:
return
# Set up authentication metadata for NVIDIA Cloud Functions
metadata = [
["function-id", self._function_id],
["authorization", f"Bearer {self._api_key}"],
]
# Create authenticated client
auth = riva.client.Auth(None, True, self._server, metadata)
self._asr_service = riva.client.ASRService(auth)
logger.info(f"Initialized RivaSegmentedSTTService with model: {self.model_name}")
def _create_recognition_config(self):
"""Create the Riva ASR recognition configuration."""
# Create base configuration
config = riva.client.RecognitionConfig(
language_code=self._language, # Now using the string, not a tuple
max_alternatives=1,
profanity_filter=self._profanity_filter,
enable_automatic_punctuation=self._automatic_punctuation,
verbatim_transcripts=self._verbatim_transcripts,
)
# Add word boosting if specified
if self._boosted_lm_words:
riva.client.add_word_boosting_to_config(
config, self._boosted_lm_words, self._boosted_lm_score
)
# Add voice activity detection parameters
riva.client.add_endpoint_parameters_to_config(
config,
self._start_history,
self._start_threshold,
self._stop_history,
self._stop_history_eou,
self._stop_threshold,
self._stop_threshold_eou,
)
# Add any custom configuration
if self._custom_configuration:
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
return config
def can_generate_metrics(self) -> bool:
"""Indicates whether this service can generate processing metrics."""
return True
async def set_model(self, model: str):
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
logger.warning(
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
)
async def start(self, frame: StartFrame):
"""Initialize the service when the pipeline starts."""
await super().start(frame)
self._initialize_client()
self._config = self._create_recognition_config()
async def set_language(self, language: Language):
"""Set the language for the STT service."""
logger.info(f"Switching STT language to: [{language}]")
self._language_enum = language
self._language = self.language_to_service_language(language) or "en-US"
self._settings["language"] = language
# Update configuration with new language
if self._config:
self._config.language_code = self._language
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Transcribe an audio segment.
Args:
audio: Raw audio bytes in WAV format (already converted by base class).
Yields:
Frame: TranscriptionFrame containing the transcribed text.
"""
try:
await self.start_processing_metrics()
await self.start_ttfb_metrics()
# Make sure the client is initialized
if self._asr_service is None:
self._initialize_client()
# Make sure the config is created
if self._config is None:
self._config = self._create_recognition_config()
# Type assertion to satisfy the IDE
assert self._asr_service is not None, "ASR service not initialized"
assert self._config is not None, "Recognition config not created"
# Process audio with Riva ASR - explicitly request non-future response
raw_response = self._asr_service.offline_recognize(audio, self._config, future=False)
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
# Process the response - handle different possible return types
try:
# If it's a future-like object, get the result
if hasattr(raw_response, "result"):
response = raw_response.result()
else:
response = raw_response
# Process transcription results
transcription_found = False
# Now we can safely check results
# Type hint for the IDE
results = getattr(response, "results", [])
for result in results:
alternatives = getattr(result, "alternatives", [])
if alternatives:
text = alternatives[0].transcript.strip()
if text:
logger.debug(f"Transcription: [{text}]")
yield TranscriptionFrame(
text, "", time_now_iso8601(), self._language_enum
)
transcription_found = True
if not transcription_found:
logger.debug("No transcription results found in Riva response")
except AttributeError as ae:
logger.error(f"Unexpected response structure from Riva: {ae}")
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
logger.exception(f"Riva Canary ASR error: {e}")
yield ErrorFrame(f"Riva Canary ASR error: {str(e)}")
class ParakeetSTTService(RivaSTTService):
"""Deprecated: Use RivaSTTService instead."""
def __init__(
self,
*,
api_key: str,
server: str = "grpc.nvcf.nvidia.com:443",
model_function_map: Mapping[str, str] = {
"function_id": "1598d209-5e27-4d3c-8079-4751568b1081",
"model_name": "parakeet-ctc-1.1b-asr",
},
sample_rate: Optional[int] = None,
params: RivaSTTService.InputParams = RivaSTTService.InputParams(), # Use parent class's type
**kwargs,
):
super().__init__(
api_key=api_key,
server=server,
model_function_map=model_function_map,
sample_rate=sample_rate,
params=params,
**kwargs,
)
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`ParakeetSTTService` is deprecated, use `RivaSTTService` instead.",
DeprecationWarning,
)

View File

@@ -5,7 +5,11 @@
#
import asyncio
from typing import AsyncGenerator, Optional
import os
from typing import AsyncGenerator, Mapping, Optional
# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
from loguru import logger
from pydantic import BaseModel
@@ -27,10 +31,10 @@ except ModuleNotFoundError as e:
logger.error("In order to use NVIDIA Riva TTS, you need to `pip install pipecat-ai[riva]`.")
raise Exception(f"Missing module: {e}")
FASTPITCH_TIMEOUT_SECS = 5
RIVA_TTS_TIMEOUT_SECS = 5
class FastPitchTTSService(TTSService):
class RivaTTSService(TTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN_US
quality: Optional[int] = 20
@@ -38,11 +42,14 @@ class FastPitchTTSService(TTSService):
def __init__(
self,
*,
api_key: str,
api_key: str = None,
server: str = "grpc.nvcf.nvidia.com:443",
voice_id: str = "English-US.Female-1",
voice_id: str = "Magpie-Multilingual.EN-US.Ray",
sample_rate: Optional[int] = None,
function_id: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972",
model_function_map: Mapping[str, str] = {
"function_id": "877104f7-e885-42b9-8de8-f6e4c6303969",
"model_name": "magpie-tts-multilingual",
},
params: InputParams = InputParams(),
**kwargs,
):
@@ -51,12 +58,13 @@ class FastPitchTTSService(TTSService):
self._voice_id = voice_id
self._language_code = params.language
self._quality = params.quality
self._function_id = model_function_map.get("function_id")
self.set_model_name("fastpitch-hifigan-tts")
self.set_model_name(model_function_map.get("model_name"))
self.set_voice(voice_id)
metadata = [
["function-id", function_id],
["function-id", self._function_id],
["authorization", f"Bearer {api_key}"],
]
auth = riva.client.Auth(None, True, server, metadata)
@@ -68,6 +76,13 @@ class FastPitchTTSService(TTSService):
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
)
async def set_model(self, model: str):
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
logger.warning(
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
def read_audio_responses(queue: asyncio.Queue):
def add_response(r):
@@ -79,8 +94,8 @@ class FastPitchTTSService(TTSService):
self._voice_id,
self._language_code,
sample_rate_hz=self.sample_rate,
audio_prompt_file=None,
quality=self._quality,
zero_shot_audio_prompt_file=None,
zero_shot_quality=self._quality,
custom_dictionary={},
)
for r in responses:
@@ -100,7 +115,7 @@ class FastPitchTTSService(TTSService):
await asyncio.to_thread(read_audio_responses, queue)
# Wait for the thread to start.
resp = await asyncio.wait_for(queue.get(), FASTPITCH_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
while resp:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(
@@ -109,9 +124,46 @@ class FastPitchTTSService(TTSService):
num_channels=1,
)
yield frame
resp = await asyncio.wait_for(queue.get(), FASTPITCH_TIMEOUT_SECS)
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()
class FastPitchTTSService(RivaTTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN_US
quality: Optional[int] = 20
def __init__(
self,
*,
api_key: str = None,
server: str = "grpc.nvcf.nvidia.com:443",
voice_id: str = "English-US.Female-1",
sample_rate: Optional[int] = None,
model_function_map: Mapping[str, str] = {
"function_id": "0149dedb-2be8-4195-b9a0-e57e0e14f972",
"model_name": "fastpitch-hifigan-tts",
},
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(
api_key=api_key,
voice_id=voice_id,
sample_rate=sample_rate,
model_function_map=model_function_map,
params=params,
**kwargs,
)
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`FastPitchTTSService` is deprecated, use `RivaTTSService` instead.",
DeprecationWarning,
)

View File

@@ -64,13 +64,16 @@ class SimliVideoService(FrameProcessor):
async for audio_frame in self._simli_client.getAudioStreamIterator():
resampled_frames = self._pipecat_resampler.resample(audio_frame)
for resampled_frame in resampled_frames:
await self.push_frame(
TTSAudioRawFrame(
audio=resampled_frame.to_ndarray().tobytes(),
sample_rate=self._pipecat_resampler.rate,
num_channels=1,
),
)
audio_array = resampled_frame.to_ndarray()
# Only push frame is there is audio (e.g. not silence)
if audio_array.any():
await self.push_frame(
TTSAudioRawFrame(
audio=audio_array.tobytes(),
sample_rate=self._pipecat_resampler.rate,
num_channels=1,
),
)
async def _consume_and_process_video(self):
await self._pipecat_resampler_event.wait()

View File

@@ -66,6 +66,8 @@ class TTSService(AIService):
# Text filter executed after text has been aggregated.
text_filters: Sequence[BaseTextFilter] = [],
text_filter: Optional[BaseTextFilter] = None,
# Audio transport destination of the generated frames.
transport_destination: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
@@ -82,6 +84,8 @@ class TTSService(AIService):
self._settings: Dict[str, Any] = {}
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
self._text_filters: Sequence[BaseTextFilter] = text_filters
self._transport_destination: Optional[str] = transport_destination
if text_filter:
import warnings
@@ -207,13 +211,16 @@ class TTSService(AIService):
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame):
silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit
await self.push_frame(
TTSAudioRawFrame(
audio=b"\x00" * silence_num_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
silence_frame = TTSAudioRawFrame(
audio=b"\x00" * silence_num_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
silence_frame.transport_destination = self._transport_destination
await self.push_frame(silence_frame)
if isinstance(frame, (TTSStartedFrame, TTSStoppedFrame, TTSAudioRawFrame, TTSTextFrame)):
frame.transport_destination = self._transport_destination
await super().push_frame(frame, direction)

View File

@@ -79,7 +79,7 @@ class BaseInputTransport(FrameProcessor):
)
self._params.audio_in_passthrough = True
if self._params.camera_in_enabled or self._params.camera_out_enabled:
if self._params.camera_in_enabled:
import warnings
with warnings.catch_warnings():

View File

@@ -8,11 +8,12 @@ import asyncio
import itertools
import sys
import time
from typing import AsyncGenerator, List
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
from loguru import logger
from PIL import Image
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
BotSpeakingFrame,
@@ -46,35 +47,28 @@ class BaseOutputTransport(FrameProcessor):
self._params = params
# Task to process incoming frames so we don't block upstream elements.
self._sink_task = None
# Task to process incoming frames using a clock.
self._sink_clock_task = None
# Task to write/send audio and image frames.
self._video_out_task = None
# These are the images that we should send at our desired framerate.
self._video_images = None
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
self._resampler = create_default_resampler()
# Chunk size that will be written. It will be computed on StartFrame
# We write 10ms*CHUNKS of audio at a time (where CHUNKS is the
# `audio_out_10ms_chunks` parameter). If we receive long audio frames we
# will chunk them. This helps with interruption handling. It will be
# initialized on StartFrame.
self._audio_chunk_size = 0
self._audio_buffer = bytearray()
self._stopped_event = asyncio.Event()
# Indicates if the bot is currently speaking.
self._bot_speaking = False
# We will have one media sender per output frame destination. This allow
# us to send multiple streams at the same time if the transport allows
# it.
self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {}
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def audio_chunk_size(self) -> int:
return self._audio_chunk_size
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
@@ -84,42 +78,63 @@ class BaseOutputTransport(FrameProcessor):
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
# Start audio mixer.
if self._params.audio_out_mixer:
await self._params.audio_out_mixer.start(self._sample_rate)
self._create_video_task()
self._create_sink_tasks()
# Register destinations.
for destination in self._params.audio_out_destinations:
await self.register_audio_destination(destination)
for destination in self._params.video_out_destinations:
await self.register_video_destination(destination)
# Start default media sender.
self._media_senders[None] = BaseOutputTransport.MediaSender(
self,
destination=None,
sample_rate=self.sample_rate,
audio_chunk_size=self.audio_chunk_size,
params=self._params,
)
await self._media_senders[None].start(frame)
# Media senders already send both audio and video, so make sure we only
# have one media server per shared name.
destinations = list(
set(self._params.audio_out_destinations + self._params.video_out_destinations)
)
# Start media senders.
for destination in destinations:
self._media_senders[destination] = BaseOutputTransport.MediaSender(
self,
destination=destination,
sample_rate=self.sample_rate,
audio_chunk_size=self.audio_chunk_size,
params=self._params,
)
await self._media_senders[destination].start(frame)
async def stop(self, frame: EndFrame):
# Let the sink tasks process the queue until they reach this EndFrame.
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the video and audio tasks below
# because they might be still rendering.
if self._sink_task:
await self.wait_for_task(self._sink_task)
if self._sink_clock_task:
await self.wait_for_task(self._sink_clock_task)
# We can now cancel the video task.
await self._cancel_video_task()
for _, sender in self._media_senders.items():
await sender.stop(frame)
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter if we cancel sink
# tasks first or not.
await self._cancel_sink_tasks()
await self._cancel_video_task()
for _, sender in self._media_senders.items():
await sender.cancel(frame)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def register_video_destination(self, destination: str):
pass
async def write_raw_audio_frames(self, frames: bytes):
async def register_audio_destination(self, destination: str):
pass
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
pass
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
pass
async def send_audio(self, frame: OutputAudioRawFrame):
@@ -150,7 +165,7 @@ class BaseOutputTransport(FrameProcessor):
await self.push_frame(frame, direction)
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
await self.push_frame(frame, direction)
await self._handle_interruptions(frame)
await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, SystemFrame):
@@ -160,117 +175,428 @@ class BaseOutputTransport(FrameProcessor):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
await self._params.audio_out_mixer.process_frame(frame)
elif isinstance(frame, MixerControlFrame):
await self._handle_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_audio(frame)
await self._handle_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await self._handle_image(frame)
await self._handle_frame(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
await self._handle_frame(frame)
elif direction == FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
else:
await self._sink_queue.put(frame)
await self._handle_frame(frame)
async def _handle_interruptions(self, frame: Frame):
if not self.interruptions_allowed:
async def _handle_frame(self, frame: Frame):
if frame.transport_destination not in self._media_senders:
logger.warning(
f"{self} destination [{frame.transport_destination}] not registered for frame {frame}"
)
return
sender = self._media_senders[frame.transport_destination]
if isinstance(frame, StartInterruptionFrame):
# Cancel sink and video tasks.
await self._cancel_sink_tasks()
await self._cancel_video_task()
# Create sink and video tasks.
await sender.handle_interruptions(frame)
elif isinstance(frame, OutputAudioRawFrame):
await sender.handle_audio_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
await sender.handle_image_frame(frame)
elif isinstance(frame, MixerControlFrame):
await sender.handle_mixer_control_frame(frame)
elif frame.pts:
await sender.handle_timed_frame(frame)
else:
await sender.handle_sync_frame(frame)
#
# Media Sender
#
class MediaSender:
def __init__(
self,
transport: "BaseOutputTransport",
*,
destination: Optional[str],
sample_rate: int,
audio_chunk_size: int,
params: TransportParams,
):
self._transport = transport
self._destination = destination
self._sample_rate = sample_rate
self._audio_chunk_size = audio_chunk_size
self._params = params
# Buffer to keep track of incoming audio.
self._audio_buffer = bytearray()
# This will be used to resample incoming audio to the output sample rate.
self._resampler = create_default_resampler()
# The user can provide a single mixer, to be used by the default
# destination, or a destination/mixer mapping.
self._mixer: Optional[BaseAudioMixer] = None
# These are the images that we should send at our desired framerate.
self._video_images = None
# Indicates if the bot is currently speaking.
self._bot_speaking = False
self._audio_task: Optional[asyncio.Task] = None
self._video_task: Optional[asyncio.Task] = None
self._clock_task: Optional[asyncio.Task] = None
@property
def sample_rate(self) -> int:
return self._sample_rate
@property
def audio_chunk_size(self) -> int:
return self._audio_chunk_size
async def start(self, frame: StartFrame):
self._audio_buffer = bytearray()
# Create all tasks.
self._create_video_task()
self._create_sink_tasks()
self._create_clock_task()
self._create_audio_task()
# Check if we have an audio mixer for our destination.
if self._params.audio_out_mixer:
if isinstance(self._params.audio_out_mixer, Mapping):
self._mixer = self._params.audio_out_mixer.get(self._destination, None)
elif not self._destination:
# Only use the default mixer if we are the default destination.
self._mixer = self._params.audio_out_mixer
# Start audio mixer.
if self._mixer:
await self._mixer.start(self._sample_rate)
async def stop(self, frame: EndFrame):
# Let the sink tasks process the queue until they reach this EndFrame.
await self._clock_queue.put((sys.maxsize, frame.id, frame))
await self._audio_queue.put(frame)
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the audio and clock tasks. We
# also need to wait for these tasks before cancelling the video task
# because it might be still rendering.
if self._audio_task:
await self._transport.wait_for_task(self._audio_task)
if self._clock_task:
await self._transport.wait_for_task(self._clock_task)
# Stop audio mixer.
if self._mixer:
await self._mixer.stop()
# We can now cancel the video task.
await self._cancel_video_task()
async def cancel(self, frame: CancelFrame):
# Since we are cancelling everything it doesn't matter what task we cancel first.
await self._cancel_audio_task()
await self._cancel_clock_task()
await self._cancel_video_task()
async def handle_interruptions(self, _: StartInterruptionFrame):
if not self._transport.interruptions_allowed:
return
# Cancel tasks.
await self._cancel_audio_task()
await self._cancel_clock_task()
await self._cancel_video_task()
# Create tasks.
self._create_video_task()
self._create_clock_task()
self._create_audio_task()
# Let's send a bot stopped speaking if we have to.
await self._bot_stopped_speaking()
async def _handle_audio(self, frame: OutputAudioRawFrame):
if not self._params.audio_out_enabled:
return
async def handle_audio_frame(self, frame: OutputAudioRawFrame):
if not self._params.audio_out_enabled:
return
# We might need to resample if incoming audio doesn't match the
# transport sample rate.
resampled = await self._resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
cls = type(frame)
self._audio_buffer.extend(resampled)
while len(self._audio_buffer) >= self._audio_chunk_size:
chunk = cls(
bytes(self._audio_buffer[: self._audio_chunk_size]),
sample_rate=self._sample_rate,
num_channels=frame.num_channels,
# We might need to resample if incoming audio doesn't match the
# transport sample rate.
resampled = await self._resampler.resample(
frame.audio, frame.sample_rate, self._sample_rate
)
await self._sink_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
if not self._params.video_out_enabled:
return
cls = type(frame)
self._audio_buffer.extend(resampled)
while len(self._audio_buffer) >= self._audio_chunk_size:
chunk = cls(
bytes(self._audio_buffer[: self._audio_chunk_size]),
sample_rate=self._sample_rate,
num_channels=frame.num_channels,
)
await self._audio_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
if self._params.video_out_is_live:
await self._video_out_queue.put(frame)
else:
await self._sink_queue.put(frame)
async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame):
if not self._params.video_out_enabled:
return
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug("Bot started speaking")
await self.push_frame(BotStartedSpeakingFrame())
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = True
if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame):
await self._video_queue.put(frame)
elif isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
else:
await self._set_video_images(frame.images)
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug("Bot stopped speaking")
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def handle_timed_frame(self, frame: Frame):
await self._clock_queue.put((frame.pts, frame.id, frame))
#
# Sink tasks
#
async def handle_sync_frame(self, frame: Frame):
await self._audio_queue.put(frame)
def _create_sink_tasks(self):
if not self._sink_task:
self._sink_queue = asyncio.Queue()
self._sink_task = self.create_task(self._sink_task_handler())
if not self._sink_clock_task:
self._sink_clock_queue = asyncio.PriorityQueue()
self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
async def handle_mixer_control_frame(self, frame: MixerControlFrame):
if self._mixer:
await self._mixer.process_frame(frame)
async def _cancel_sink_tasks(self):
# Stop sink tasks.
if self._sink_task:
await self.cancel_task(self._sink_task)
self._sink_task = None
# Stop sink clock tasks.
if self._sink_clock_task:
await self.cancel_task(self._sink_clock_task)
self._sink_clock_task = None
#
# Audio handling
#
async def _sink_frame_handler(self, frame: Frame):
if isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
def _create_audio_task(self):
if not self._audio_task:
self._audio_queue = asyncio.Queue()
self._audio_task = self._transport.create_task(self._audio_task_handler())
async def _sink_clock_task_handler(self):
running = True
while running:
try:
timestamp, _, frame = await self._sink_clock_queue.get()
async def _cancel_audio_task(self):
if self._audio_task:
await self._transport.cancel_task(self._audio_task)
self._audio_task = None
async def _bot_started_speaking(self):
if not self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
)
downstream_frame = BotStartedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStartedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = True
async def _bot_stopped_speaking(self):
if self._bot_speaking:
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
await self._transport.push_frame(downstream_frame)
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
async def _handle_frame(self, frame: Frame):
if isinstance(frame, OutputImageRawFrame):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self._transport.send_message(frame)
def _next_frame(self) -> AsyncGenerator[Frame, None]:
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
yield frame
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size
# chunk duration in seconds
chunk_duration = (10 * self._params.audio_out_10ms_chunks) / 1000.0
next_frame_time = time.time()
while True:
try:
frame = self._audio_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._mixer.mix(silence),
sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame
# Throttle frame generation to maintain real-time pacing and avoid flooding with silence
wait = next_frame_time - time.time()
if wait > 0:
await asyncio.sleep(wait)
next_frame_time += chunk_duration
if self._mixer:
return with_mixer(BOT_VAD_STOP_SECS)
else:
return without_mixer(BOT_VAD_STOP_SECS)
async def _audio_task_handler(self):
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self._transport.push_frame(BotSpeakingFrame())
await self._transport.push_frame(
BotSpeakingFrame(), FrameDirection.UPSTREAM
)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break
# Handle frame.
await self._handle_frame(frame)
# Also, push frame downstream in case anyone else needs it.
await self._transport.push_frame(frame)
# Send audio.
if isinstance(frame, OutputAudioRawFrame):
await self._transport.write_raw_audio_frames(frame.audio, self._destination)
#
# Video handling
#
def _create_video_task(self):
if not self._video_task and self._params.video_out_enabled:
self._video_queue = asyncio.Queue()
self._video_task = self._transport.create_task(self._video_task_handler())
async def _cancel_video_task(self):
# Stop video output task.
if self._video_task:
await self._transport.cancel_task(self._video_task)
self._video_task = None
async def _set_video_image(self, image: OutputImageRawFrame):
self._video_images = itertools.cycle([image])
async def _set_video_images(self, images: List[OutputImageRawFrame]):
self._video_images = itertools.cycle(images)
async def _video_task_handler(self):
self._video_start_time = None
self._video_frame_index = 0
self._video_frame_duration = 1 / self._params.video_out_framerate
self._video_frame_reset = self._video_frame_duration * 5
while True:
if self._params.video_out_is_live:
await self._video_is_live_handler()
elif self._video_images:
image = next(self._video_images)
await self._draw_image(image)
await asyncio.sleep(self._video_frame_duration)
else:
await asyncio.sleep(self._video_frame_duration)
async def _video_is_live_handler(self):
image = await self._video_queue.get()
# We get the start time as soon as we get the first image.
if not self._video_start_time:
self._video_start_time = time.time()
self._video_frame_index = 0
# Calculate how much time we need to wait before rendering next image.
real_elapsed_time = time.time() - self._video_start_time
real_render_time = self._video_frame_index * self._video_frame_duration
delay_time = self._video_frame_duration + real_render_time - real_elapsed_time
if abs(delay_time) > self._video_frame_reset:
self._video_start_time = time.time()
self._video_frame_index = 0
elif delay_time > 0:
await asyncio.sleep(delay_time)
self._video_frame_index += 1
# Render image
await self._draw_image(image)
self._video_queue.task_done()
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.video_out_width, self._params.video_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)
await self._transport.write_raw_video_frame(frame, self._destination)
#
# Clock handling
#
def _create_clock_task(self):
if not self._clock_task:
self._clock_queue = asyncio.PriorityQueue()
self._clock_task = self._transport.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):
if self._clock_task:
await self._transport.cancel_task(self._clock_task)
self._clock_task = None
async def _clock_task_handler(self):
running = True
while running:
timestamp, _, frame = await self._clock_queue.get()
# If we hit an EndFrame, we can finish right away.
running = not isinstance(frame, EndFrame)
@@ -279,167 +605,12 @@ class BaseOutputTransport(FrameProcessor):
# has already passed we process it, otherwise we wait until it's
# time to process it.
if running:
current_time = self.get_clock().get_time()
current_time = self._transport.get_clock().get_time()
if timestamp > current_time:
wait_time = nanoseconds_to_seconds(timestamp - current_time)
await asyncio.sleep(wait_time)
# Handle frame.
await self._sink_frame_handler(frame)
# Push frame downstream.
await self._transport.push_frame(frame)
# Also, push frame downstream in case anyone else needs it.
await self.push_frame(frame)
self._sink_clock_queue.task_done()
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(f"{self} error processing sink clock queue: {e}")
def _next_frame(self) -> AsyncGenerator[Frame, None]:
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
yield frame
except asyncio.TimeoutError:
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
last_frame_time = 0
silence = b"\x00" * self._audio_chunk_size
while True:
try:
frame = self._sink_queue.get_nowait()
if isinstance(frame, OutputAudioRawFrame):
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
last_frame_time = time.time()
yield frame
except asyncio.QueueEmpty:
# Notify the bot stopped speaking upstream if necessary.
diff_time = time.time() - last_frame_time
if diff_time > vad_stop_secs:
await self._bot_stopped_speaking()
# Generate an audio frame with only the mixer's part.
frame = OutputAudioRawFrame(
audio=await self._params.audio_out_mixer.mix(silence),
sample_rate=self._sample_rate,
num_channels=self._params.audio_out_channels,
)
yield frame
if self._params.audio_out_mixer:
return with_mixer(BOT_VAD_STOP_SECS)
else:
return without_mixer(BOT_VAD_STOP_SECS)
async def _sink_task_handler(self):
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
# every audio chunk.
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
bot_speaking_counter = 0
async for frame in self._next_frame():
# Notify the bot started speaking upstream if necessary and that
# it's actually speaking.
if isinstance(frame, TTSAudioRawFrame):
await self._bot_started_speaking()
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
await self.push_frame(BotSpeakingFrame())
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
bot_speaking_counter = 0
bot_speaking_counter += 1
# No need to push EndFrame, it's pushed from process_frame().
if isinstance(frame, EndFrame):
break
# Handle frame.
await self._sink_frame_handler(frame)
# Also, push frame downstream in case anyone else needs it.
await self.push_frame(frame)
# Send audio.
if isinstance(frame, OutputAudioRawFrame):
await self.write_raw_audio_frames(frame.audio)
#
# Video task
#
def _create_video_task(self):
# Create video output queue and task if needed.
if not self._video_out_task and self._params.video_out_enabled:
self._video_out_queue = asyncio.Queue()
self._video_out_task = self.create_task(self._video_out_task_handler())
async def _cancel_video_task(self):
# Stop video output task.
if self._video_out_task and self._params.video_out_enabled:
await self.cancel_task(self._video_out_task)
self._video_out_task = None
async def _draw_image(self, frame: OutputImageRawFrame):
desired_size = (self._params.video_out_width, self._params.video_out_height)
# TODO: we should refactor in the future to support dynamic resolutions
# which is kind of what happens in P2P connections.
# We need to add support for that inside the DailyTransport
if frame.size != desired_size:
image = Image.frombytes(frame.format, frame.size, frame.image)
resized_image = image.resize(desired_size)
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
frame = OutputImageRawFrame(
resized_image.tobytes(), resized_image.size, resized_image.format
)
await self.write_raw_video_frame(frame)
async def _set_video_image(self, image: OutputImageRawFrame):
self._video_images = itertools.cycle([image])
async def _set_video_images(self, images: List[OutputImageRawFrame]):
self._video_images = itertools.cycle(images)
async def _video_out_task_handler(self):
self._video_out_start_time = None
self._video_out_frame_index = 0
self._video_out_frame_duration = 1 / self._params.video_out_framerate
self._video_out_frame_reset = self._video_out_frame_duration * 5
while True:
if self._params.video_out_is_live:
await self._video_out_is_live_handler()
elif self._video_images:
image = next(self._video_images)
await self._draw_image(image)
await asyncio.sleep(self._video_out_frame_duration)
else:
await asyncio.sleep(self._video_out_frame_duration)
async def _video_out_is_live_handler(self):
image = await self._video_out_queue.get()
# We get the start time as soon as we get the first image.
if not self._video_out_start_time:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
# Calculate how much time we need to wait before rendering next image.
real_elapsed_time = time.time() - self._video_out_start_time
real_render_time = self._video_out_frame_index * self._video_out_frame_duration
delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
if abs(delay_time) > self._video_out_frame_reset:
self._video_out_start_time = time.time()
self._video_out_frame_index = 0
elif delay_time > 0:
await asyncio.sleep(delay_time)
self._video_out_frame_index += 1
# Render image
await self._draw_image(image)
self._video_out_queue.task_done()
self._clock_queue.task_done()

View File

@@ -5,7 +5,7 @@
#
from abc import abstractmethod
from typing import Optional
from typing import List, Mapping, Optional
from pydantic import BaseModel, ConfigDict
@@ -33,7 +33,8 @@ class TransportParams(BaseModel):
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_out_10ms_chunks: int = 4
audio_out_mixer: Optional[BaseAudioMixer] = None
audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
audio_out_destinations: List[str] = []
audio_in_enabled: bool = False
audio_in_sample_rate: Optional[int] = None
audio_in_channels: int = 1
@@ -48,6 +49,7 @@ class TransportParams(BaseModel):
video_out_bitrate: int = 800000
video_out_framerate: int = 30
video_out_color_format: str = "RGB"
video_out_destinations: List[str] = []
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None

View File

@@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames

View File

@@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
def _write_frame_to_tk(self, frame: OutputImageRawFrame):

View File

@@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._client.setup(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._client.is_closing:
return

View File

@@ -284,11 +284,13 @@ class SmallWebRTCClient:
)
yield audio_frame
async def write_raw_audio_frames(self, data: bytes):
async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None):
if self._can_send() and self._audio_output_track:
await self._audio_output_track.add_audio_bytes(data)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
if self._can_send() and self._video_output_track:
self._video_output_track.add_video_frame(frame)
@@ -482,9 +484,10 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
self._params = params
async def start(self, frame: StartFrame):
await super().start(frame)
await self._client.setup(self._params, frame)
await self._client.connect()
# Parent start.
await super().start(frame)
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -497,10 +500,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
await self._client.write_raw_audio_frames(frames)
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
await self._client.write_raw_video_frame(frame)

View File

@@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
@@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self.sample_rate,

View File

@@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if not self._websocket:
# Simulate audio playback with a sleep.
await self._write_audio_sleep()

View File

@@ -8,17 +8,13 @@ import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Mapping, Optional
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
import aiohttp
from daily import (
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
@@ -34,6 +30,7 @@ from pipecat.frames.frames import (
TranscriptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
@@ -45,7 +42,17 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.utils.asyncio import BaseTaskManager
try:
from daily import CallClient, Daily, EventHandler
from daily import (
AudioData,
CallClient,
CustomAudioSource,
Daily,
EventHandler,
VideoFrame,
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -149,6 +156,8 @@ class DailyParams(TransportParams):
api_url: Daily API base URL
api_key: Daily API authentication key
dialin_settings: Optional settings for dial-in functionality
camera_out_enabled: Whether to enable the main camera output track. If enabled, it still needs `video_out_enabled=True`
microphone_out_enabled: Whether to enable the main microphone track. If enabled, it still needs `audio_out_enabled=True`
transcription_enabled: Whether to enable speech transcription
transcription_settings: Configuration for transcription service
"""
@@ -156,6 +165,8 @@ class DailyParams(TransportParams):
api_url: str = "https://api.daily.co/v1"
api_key: str = ""
dialin_settings: Optional[DailyDialinSettings] = None
camera_out_enabled: bool = True
microphone_out_enabled: bool = True
transcription_enabled: bool = False
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
@@ -275,6 +286,7 @@ class DailyTransportClient(EventHandler):
self._transport_name = transport_name
self._participant_id: str = ""
self._audio_renderers = {}
self._video_renderers = {}
self._transcription_ids = []
self._transcription_status = None
@@ -310,6 +322,7 @@ class DailyTransportClient(EventHandler):
self._camera: Optional[VirtualCameraDevice] = None
self._mic: Optional[VirtualMicrophoneDevice] = None
self._speaker: Optional[VirtualSpeakerDevice] = None
self._audio_sources: Dict[str, CustomAudioSource] = {}
def _camera_name(self):
return f"camera-{self}"
@@ -328,6 +341,14 @@ class DailyTransportClient(EventHandler):
def participant_id(self) -> str:
return self._participant_id
@property
def in_sample_rate(self) -> int:
return self._in_sample_rate
@property
def out_sample_rate(self) -> int:
return self._out_sample_rate
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if not self._joined:
return
@@ -365,19 +386,27 @@ class DailyTransportClient(EventHandler):
await asyncio.sleep(0.01)
return None
async def write_raw_audio_frames(self, frames: bytes):
if not self._mic:
return None
async def register_audio_destination(self, destination: str):
self._audio_sources[destination] = await self.add_custom_audio_track(destination)
self._client.update_publishing({"customAudio": {destination: True}})
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
future = self._get_event_loop().create_future()
self._mic.write_frames(frames, completion=completion_callback(future))
if not destination and self._mic:
self._mic.write_frames(frames, completion=completion_callback(future))
elif destination and destination in self._audio_sources:
source = self._audio_sources[destination]
source.write_frames(frames, completion=completion_callback(future))
else:
logger.warning(f"{self} unable to write audio frames to destination [{destination}]")
future.set_result(None)
await future
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
if not self._camera:
return None
self._camera.write_frame(frame.image)
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
if not destination and self._camera:
self._camera.write_frame(frame.image)
async def setup(self, frame: StartFrame):
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
@@ -480,6 +509,9 @@ class DailyTransportClient(EventHandler):
async def _join(self):
future = self._get_event_loop().create_future()
camera_enabled = self._params.video_out_enabled and self._params.camera_out_enabled
microphone_enabled = self._params.audio_out_enabled and self._params.microphone_out_enabled
self._client.join(
self._room_url,
self._token,
@@ -487,13 +519,13 @@ class DailyTransportClient(EventHandler):
client_settings={
"inputs": {
"camera": {
"isEnabled": self._params.video_out_enabled,
"isEnabled": camera_enabled,
"settings": {
"deviceId": self._camera_name(),
},
},
"microphone": {
"isEnabled": self._params.audio_out_enabled,
"isEnabled": microphone_enabled,
"settings": {
"deviceId": self._mic_name(),
"customConstraints": {
@@ -648,6 +680,28 @@ class DailyTransportClient(EventHandler):
if self._joined and self._transcription_status:
await self.update_transcription(self._transcription_ids)
async def capture_participant_audio(
self,
participant_id: str,
callback: Callable,
audio_source: str = "microphone",
):
# Only enable the desired audio source subscription on this participant.
if audio_source in ("microphone", "screenAudio"):
media = {"media": {audio_source: "subscribed"}}
else:
media = {"media": {"customAudio": {audio_source: "subscribed"}}}
await self.update_subscriptions(participant_settings={participant_id: media})
self._audio_renderers[participant_id] = {audio_source: callback}
self._client.set_audio_renderer(
participant_id,
self._audio_data_received,
audio_source=audio_source,
)
async def capture_participant_video(
self,
participant_id: str,
@@ -656,12 +710,15 @@ class DailyTransportClient(EventHandler):
video_source: str = "camera",
color_format: str = "RGB",
):
# Only enable the desired video source subscription on this participant.
await self.update_subscriptions(
participant_settings={participant_id: {"media": {video_source: "subscribed"}}}
)
# Only enable the desired audio source subscription on this participant.
if video_source in ("camera", "screenVideo"):
media = {"media": {video_source: "subscribed"}}
else:
media = {"media": {"customVideo": {video_source: "subscribed"}}}
self._video_renderers[participant_id] = callback
await self.update_subscriptions(participant_settings={participant_id: media})
self._video_renderers[participant_id] = {video_source: callback}
self._client.set_video_renderer(
participant_id,
@@ -670,6 +727,20 @@ class DailyTransportClient(EventHandler):
color_format=color_format,
)
async def add_custom_audio_track(self, track_name: str) -> CustomAudioSource:
future = self._get_event_loop().create_future()
audio_source = CustomAudioSource(self._out_sample_rate, 1)
self._client.add_custom_audio_track(
track_name=track_name,
audio_source=audio_source,
completion=completion_callback(future),
)
await future
return audio_source
async def update_transcription(self, participants=None, instance_id=None):
future = self._get_event_loop().create_future()
self._client.update_transcription(
@@ -686,7 +757,15 @@ class DailyTransportClient(EventHandler):
)
await future
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
future = self._get_event_loop().create_future()
self._client.update_publishing(
publishing_settings=publishing_settings,
completion=completion_callback(future),
)
await future
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
future = self._get_event_loop().create_future()
self._client.update_remote_participants(
remote_participants=remote_participants, completion=completion_callback(future)
@@ -773,15 +852,15 @@ class DailyTransportClient(EventHandler):
# Daily (CallClient callbacks)
#
def _video_frame_received(self, participant_id, video_frame):
callback = self._video_renderers[participant_id]
self._call_async_callback(
callback,
participant_id,
video_frame.buffer,
(video_frame.width, video_frame.height),
video_frame.color_format,
)
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
callback = self._audio_renderers[participant_id][audio_source]
self._call_async_callback(callback, participant_id, audio_data, audio_source)
def _video_frame_received(
self, participant_id: str, video_frame: VideoFrame, video_source: str
):
callback = self._video_renderers[participant_id][video_source]
self._call_async_callback(callback, participant_id, video_frame, video_source)
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(
@@ -837,6 +916,8 @@ class DailyInputTransport(BaseInputTransport):
# internally to be processed.
self._audio_in_task = None
self._resampler = create_default_resampler()
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
@property
@@ -851,6 +932,9 @@ class DailyInputTransport(BaseInputTransport):
self._audio_in_task = self.create_task(self._audio_in_task_handler())
async def start(self, frame: StartFrame):
# Setup client.
await self._client.setup(frame)
# Parent start.
await super().start(frame)
@@ -859,8 +943,6 @@ class DailyInputTransport(BaseInputTransport):
self._initialized = True
# Setup client.
await self._client.setup(frame)
# Join the room.
await self._client.join()
if self._params.audio_in_stream_on_start:
@@ -916,6 +998,31 @@ class DailyInputTransport(BaseInputTransport):
# Audio in
#
async def capture_participant_audio(
self,
participant_id: str,
audio_source: str = "camera",
):
await self._client.capture_participant_audio(
participant_id, self._on_participant_audio_data, audio_source
)
async def _on_participant_audio_data(
self, participant_id: str, audio: AudioData, audio_source: str
):
resampled = await self._resampler.resample(
audio.audio_frames, audio.sample_rate, self._client.out_sample_rate
)
frame = UserAudioRawFrame(
user_id=participant_id,
audio=resampled,
sample_rate=self._client.out_sample_rate,
num_channels=audio.num_channels,
)
frame.transport_source = audio_source
await self.push_frame(frame)
async def _audio_in_task_handler(self):
while True:
frame = await self._client.read_next_audio_frame()
@@ -934,9 +1041,11 @@ class DailyInputTransport(BaseInputTransport):
color_format: str = "RGB",
):
self._video_renderers[participant_id] = {
"framerate": framerate,
"timestamp": 0,
"render_next_frame": [],
video_source: {
"framerate": framerate,
"timestamp": 0,
"render_next_frame": [],
}
}
await self._client.capture_participant_video(
@@ -947,12 +1056,14 @@ class DailyInputTransport(BaseInputTransport):
if frame.user_id in self._video_renderers:
self._video_renderers[frame.user_id]["render_next_frame"].append(frame)
async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
async def _on_participant_video_frame(
self, participant_id: str, video_frame: VideoFrame, video_source: str
):
render_frame = False
curr_time = time.time()
prev_time = self._video_renderers[participant_id]["timestamp"]
framerate = self._video_renderers[participant_id]["framerate"]
prev_time = self._video_renderers[participant_id][video_source]["timestamp"]
framerate = self._video_renderers[participant_id][video_source]["framerate"]
# Some times we render frames because of a request.
request_frame = None
@@ -961,20 +1072,23 @@ class DailyInputTransport(BaseInputTransport):
next_time = prev_time + 1 / framerate
render_frame = (next_time - curr_time) < 0.1
elif self._video_renderers[participant_id]["render_next_frame"]:
request_frame = self._video_renderers[participant_id]["render_next_frame"].pop(0)
elif self._video_renderers[participant_id][video_source]["render_next_frame"]:
request_frame = self._video_renderers[participant_id][video_source][
"render_next_frame"
].pop(0)
render_frame = True
if render_frame:
frame = UserImageRawFrame(
user_id=participant_id,
request=request_frame,
image=buffer,
size=size,
format=format,
image=video_frame.buffer,
size=(video_frame.width, video_frame.height),
format=video_frame.color_format,
)
frame.transport_source = video_source
await self.push_frame(frame)
self._video_renderers[participant_id]["timestamp"] = curr_time
self._video_renderers[participant_id][video_source]["timestamp"] = curr_time
class DailyOutputTransport(BaseOutputTransport):
@@ -999,6 +1113,9 @@ class DailyOutputTransport(BaseOutputTransport):
self._initialized = False
async def start(self, frame: StartFrame):
# Setup client.
await self._client.setup(frame)
# Parent start.
await super().start(frame)
@@ -1007,8 +1124,6 @@ class DailyOutputTransport(BaseOutputTransport):
self._initialized = True
# Setup client.
await self._client.setup(frame)
# Join the room.
await self._client.join()
@@ -1032,11 +1147,19 @@ class DailyOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)
async def register_video_destination(self, destination: str):
logger.warning(f"{self} registering video destinations is not supported yet")
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
await self._client.write_raw_video_frame(frame)
async def register_audio_destination(self, destination: str):
await self._client.register_audio_destination(destination)
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
await self._client.write_raw_audio_frames(frames, destination)
async def write_raw_video_frame(
self, frame: OutputImageRawFrame, destination: Optional[str] = None
):
await self._client.write_raw_video_frame(frame, destination)
class DailyTransport(BaseTransport):
@@ -1204,6 +1327,14 @@ class DailyTransport(BaseTransport):
async def capture_participant_transcription(self, participant_id: str):
await self._client.capture_participant_transcription(participant_id)
async def capture_participant_audio(
self,
participant_id: str,
audio_source: str = "microphone",
):
if self._input:
await self._input.capture_participant_audio(participant_id, audio_source)
async def capture_participant_video(
self,
participant_id: str,
@@ -1216,12 +1347,15 @@ class DailyTransport(BaseTransport):
participant_id, framerate, video_source, color_format
)
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
await self._client.update_publishing(publishing_settings=publishing_settings)
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
await self._client.update_subscriptions(
participant_settings=participant_settings, profile_settings=profile_settings
)
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
await self._client.update_remote_participants(remote_participants=remote_participants)
async def _on_joined(self, data):

View File

@@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
else:
await self._client.send_data(frame.message.encode())
async def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
await self._client.publish_audio(livekit_audio)