Compare commits
67 Commits
hush/rtviS
...
fixing_sou
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37d15588b8 | ||
|
|
d2bf8321a0 | ||
|
|
781df7ac77 | ||
|
|
1d69a92ea1 | ||
|
|
75d261639f | ||
|
|
f720d795d0 | ||
|
|
f6fe83e358 | ||
|
|
54971a0735 | ||
|
|
4513e81e13 | ||
|
|
872204b795 | ||
|
|
a94cbfe6f5 | ||
|
|
7152faafb2 | ||
|
|
e6aadaccd8 | ||
|
|
3a73aa71b8 | ||
|
|
814e7509e1 | ||
|
|
e0cf5ec016 | ||
|
|
667bd32e6a | ||
|
|
b2ecd83706 | ||
|
|
b2754117c8 | ||
|
|
6c428c303b | ||
|
|
e7d889a143 | ||
|
|
da60e7069b | ||
|
|
c14406a3b9 | ||
|
|
725ab5ec21 | ||
|
|
daf9d47e58 | ||
|
|
63a65627a2 | ||
|
|
02c07755b0 | ||
|
|
15cbd18acc | ||
|
|
93c40b87dc | ||
|
|
eeaa9f67a1 | ||
|
|
b60691c7b2 | ||
|
|
2bb1b0b343 | ||
|
|
047ef9f86c | ||
|
|
9a2c603c91 | ||
|
|
94c4169407 | ||
|
|
cb8a551db8 | ||
|
|
779f09af70 | ||
|
|
19dc0f2bfb | ||
|
|
f0709e22ba | ||
|
|
8250736f5e | ||
|
|
83348a9f93 | ||
|
|
96d40903a9 | ||
|
|
2560811805 | ||
|
|
2b8c44c008 | ||
|
|
38e2d37674 | ||
|
|
6278561f88 | ||
|
|
750e79c1ce | ||
|
|
71eb2963c5 | ||
|
|
f44e2c86ea | ||
|
|
afe1f0df8c | ||
|
|
458fddfb48 | ||
|
|
8d915c5ccb | ||
|
|
304153dd03 | ||
|
|
a6781b7352 | ||
|
|
5ad0058303 | ||
|
|
75c039de33 | ||
|
|
74e3c3677e | ||
|
|
dc20327f10 | ||
|
|
e738affd29 | ||
|
|
ef3d732607 | ||
|
|
6d63cff1bf | ||
|
|
12f42605a1 | ||
|
|
fac3337927 | ||
|
|
76d198151c | ||
|
|
6a907058de | ||
|
|
6e1f531f64 | ||
|
|
4232cca5b6 |
79
CHANGELOG.md
@@ -7,8 +7,55 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
|
||||
- Added real-time throttling to the `BaseOutputTransport` when using audio mixer
|
||||
to ensure audio frames (including silence) are emitted at the correct intervals.
|
||||
This prevents unnecessary CPU usage and avoids flooding the output with silent
|
||||
frames when no new audio is available.
|
||||
|
||||
## [0.0.66] - 2025-05-02
|
||||
|
||||
### Added
|
||||
|
||||
- Added two new input parameters to `RimeTTSService`: `pause_between_brackets`
|
||||
and `phonemize_between_brackets`.
|
||||
|
||||
- Added support for cross-platform local smart turn detection. You can use
|
||||
`LocalSmartTurnAnalyzer` for on-device inference using Torch.
|
||||
|
||||
- `BaseOutputTransport` now allows multiple destinations if the transport
|
||||
implementation supports it (e.g. Daily's custom tracks). With multiple
|
||||
destinations it is possible to send different audio or video tracks with a
|
||||
single transport simultaneously. To do that, you need to set the new
|
||||
`Frame.transport_destination` field with your desired transport destination
|
||||
(e.g. custom track name), tell the transport you want a new destination with
|
||||
`TransportParams.audio_out_destinations` or
|
||||
`TransportParams.video_out_destinations` and the transport should take care of
|
||||
the rest.
|
||||
|
||||
- Similar to the new `Frame.transport_destination`, there's a new
|
||||
`Frame.transport_source` field which is set by the `BaseInputTransport` if the
|
||||
incoming data comes from a non-default source (e.g. custom tracks).
|
||||
|
||||
- `TTSService` has a new `transport_destination` constructor parameter. This
|
||||
parameter will be used to update the `Frame.transport_destination` field for
|
||||
each generated `TTSAudioRawFrame`. This allows sending multiple bots' audio to
|
||||
multiple destinations in the same pipeline.
|
||||
|
||||
- Added `DailyTransportParams.camera_out_enabled` and
|
||||
`DailyTransportParams.microphone_out_enabled` which allows you to
|
||||
enable/disable the main output camera or microphone tracks. This is useful if
|
||||
you only want to use custom tracks and not send the main tracks. Note that you
|
||||
still need `audio_out_enabled=True` or `video_out_enabled`.
|
||||
|
||||
- Added `DailyTransport.capture_participant_audio()` which allows you to capture
|
||||
an audio source (e.g. "microphone", "screenAudio" or a custom track name) from
|
||||
a remote participant.
|
||||
|
||||
- Added `DailyTransport.update_publishing()` which allows you to update the call
|
||||
video and audio publishing settings (e.g. audio and video quality).
|
||||
|
||||
- Added `RTVIObserverParams` which allows you to configure what RTVI messages
|
||||
are sent to the clients.
|
||||
|
||||
@@ -37,6 +84,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `TransportParams.audio_mixer` now supports a string and also a dictionary to
|
||||
provide a mixer per destination. For example:
|
||||
|
||||
```python
|
||||
audio_out_mixer={
|
||||
"track-1": SoundfileMixer(...),
|
||||
"track-2": SoundfileMixer(...),
|
||||
"track-N": SoundfileMixer(...),
|
||||
},
|
||||
```
|
||||
|
||||
- The `STTMuteFilter` now mutes `InterimTranscriptionFrame` and
|
||||
`TranscriptionFrame` which allows the `STTMuteFilter` to be used in
|
||||
conjunction with transports that generate transcripts, e.g. `DailyTransport`.
|
||||
@@ -70,6 +128,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
case there's no need to push audio to the rest of the pipeline, but this is
|
||||
not a very common case.
|
||||
|
||||
- Added `RivaSegmentedSTTService`, which allows Riva offline/batch models, such
|
||||
as to be "canary-1b-asr" used in Pipecat.
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Function calls with parameters
|
||||
@@ -85,8 +146,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- `TransportParams.vad_audio_passthrough` parameter is now deprecated, use
|
||||
`TransportParams.audio_in_passthrough` instead.
|
||||
|
||||
- `ParakeetSTTService` is now deprecated, use `RivaSTTService` instead, which uses
|
||||
the model "parakeet-ctc-1.1b-asr" by default.
|
||||
|
||||
- `FastPitchTTSService` is now deprecated, use `RivaTTSService` instead, which uses
|
||||
the model "magpie-tts-multilingual" by default.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue with `SimliVideoService` where the bot was continuously outputting
|
||||
audio, which prevents the `BotStoppedSpeakingFrame` from being emitted.
|
||||
|
||||
- Fixed an issue where `OpenAIRealtimeBetaLLMService` would add two assistant
|
||||
messages to the context.
|
||||
|
||||
- Fixed an issue with `GeminiMultimodalLiveLLMService` where the context
|
||||
contained tokens instead of words.
|
||||
|
||||
@@ -102,6 +175,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Other
|
||||
|
||||
- Added `examples/daily-custom-tracks` to show how to send and receive Daily
|
||||
custom tracks.
|
||||
|
||||
- Added `examples/daily-multi-translation` to showcase how to send multiple
|
||||
simulataneous translations with the same transport.
|
||||
|
||||
- Added 04 foundational examples for client/server transports. Also, renamed
|
||||
`29-livekit-audio-chat.py` to `04b-transports-livekit.py`.
|
||||
|
||||
|
||||
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
|
||||
39
examples/daily-custom-tracks/README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
# Daily Custom Tracks
|
||||
|
||||
This example shows how to send and receive Daily custom tracks. We will run a simple `daily-python` application to send an audio file with a custom track (named "pipecat") to a room. Then, the Pipecat bot will mirror that custom track into another custom track (named "pipecat-mirror") in the same room.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Run the bot
|
||||
|
||||
Start the bot by giving it a Daily room URL.
|
||||
|
||||
```bash
|
||||
python bot.py -u ROOM_URL
|
||||
```
|
||||
|
||||
The bot will wait for the first participant to join. Then, it will mirror a custom track named "pipecat" into a new custom track named "pipecat-mirror".
|
||||
|
||||
## Run the sender
|
||||
|
||||
Now, run the custom track sender. This is a simple `daily-python` application that opens and audio file and sends it as a custom track to the same Daily room.
|
||||
|
||||
```bash
|
||||
python custom_track_sender.py -u ROOM_URL -i office-ambience-mono-16000.mp3
|
||||
```
|
||||
|
||||
## Open client
|
||||
|
||||
Finally, open the client so you can hear both custom tracks.
|
||||
|
||||
```bash
|
||||
open index.html
|
||||
```
|
||||
|
||||
Once the client is opened, copy the URL of the Daily room and join it. You should be able to select which custom track you want to hear.
|
||||
87
examples/daily-custom-tracks/bot.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class CustomTrackMirrorProcessor(FrameProcessor):
|
||||
def __init__(self, transport_destination: str, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._transport_destination = transport_destination
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, InputAudioRawFrame) and frame.transport_source:
|
||||
output_frame = OutputAudioRawFrame(
|
||||
audio=frame.audio,
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
output_frame.transport_destination = self._transport_destination
|
||||
await self.push_frame(output_frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Custom tracks mirror",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
microphone_out_enabled=False, # Disable since we just use custom tracks
|
||||
audio_out_destinations=["pipecat-mirror"],
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
CustomTrackMirrorProcessor("pipecat-mirror"),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_audio(participant["id"], audio_source="pipecat")
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
74
examples/daily-custom-tracks/custom_track_sender.py
Normal file
@@ -0,0 +1,74 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from daily import CallClient, CustomAudioSource, Daily
|
||||
from pydub import AudioSegment
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument("-u", "--url", type=str, required=True, help="URL of the Daily room to join")
|
||||
parser.add_argument(
|
||||
"-i", "--input", type=str, required=True, help="Input audio file (needs 16000 sample rate)"
|
||||
)
|
||||
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
audio = AudioSegment.from_mp3(args.input)
|
||||
|
||||
raw_bytes = audio.raw_data
|
||||
sample_rate = audio.frame_rate
|
||||
channels = audio.channels
|
||||
|
||||
print(f"Length: {len(raw_bytes)} bytes")
|
||||
print(f"Sample rate: {sample_rate}, Channels: {channels}")
|
||||
|
||||
# Initialize the Daily context & create call client
|
||||
Daily.init()
|
||||
|
||||
client = CallClient()
|
||||
|
||||
# Join the room and indicate we have a custom track named "pipecat".
|
||||
client.join(
|
||||
args.url,
|
||||
client_settings={
|
||||
"publishing": {
|
||||
"camera": False,
|
||||
"microphone": False,
|
||||
"customAudio": {"pipecat": True},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
# Just sleep for a couple of seconds. To do this well we should really use
|
||||
# completions.
|
||||
time.sleep(2)
|
||||
|
||||
# Create the custom audio source. This is where we will write our audio.
|
||||
audio_source = CustomAudioSource(sample_rate, channels)
|
||||
|
||||
# Create an audio track and assign it our audio source.
|
||||
client.add_custom_audio_track("pipecat", audio_source)
|
||||
|
||||
# Just sleep for a second. To do this well we should really use completions.
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
# Just write one second of audio until we have read all the file.
|
||||
chunk_size = sample_rate * channels * 2
|
||||
while len(raw_bytes) > 0:
|
||||
chunk = raw_bytes[:chunk_size]
|
||||
raw_bytes = raw_bytes[chunk_size:]
|
||||
audio_source.write_frames(chunk)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
client.leave()
|
||||
|
||||
# Just sleep for a second. To do this well we should really use completions.
|
||||
time.sleep(1)
|
||||
|
||||
client.release()
|
||||
173
examples/daily-custom-tracks/index.html
Normal file
@@ -0,0 +1,173 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>daily custom tracks</title>
|
||||
</head>
|
||||
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
type="text/css"
|
||||
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
|
||||
/>
|
||||
<script>
|
||||
function enableButton(buttonId, enable) {
|
||||
const button = document.getElementById(buttonId);
|
||||
button.disabled = !enable;
|
||||
}
|
||||
|
||||
function enableJoinButton(enable) {
|
||||
enableButton("join-button", enable);
|
||||
}
|
||||
|
||||
function enableLeaveButton(enable) {
|
||||
enableButton("leave-button", enable);
|
||||
}
|
||||
|
||||
function destroyPlayers(query) {
|
||||
const items = document.querySelectorAll(query);
|
||||
if (items) {
|
||||
for (const item of items) {
|
||||
item.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function destroyParticipantPlayers(participantId) {
|
||||
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`button[data-participant-id="${participantId}"]`);
|
||||
}
|
||||
|
||||
async function startPlayer(player, track) {
|
||||
player.muted = false;
|
||||
player.autoplay = true;
|
||||
if (track != null) {
|
||||
player.srcObject = new MediaStream([track]);
|
||||
}
|
||||
}
|
||||
|
||||
async function buildAudioPlayer(track, participantId) {
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
const player = document.createElement("audio");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
// Create a new button for controlling audio
|
||||
const audioControlButton = document.createElement("button");
|
||||
audioControlButton.className = "ui primary green button"
|
||||
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
|
||||
audioControlButton.dataset.participantId = participantId;
|
||||
audioControlButton.onclick = () => {
|
||||
if (player.paused) {
|
||||
|
||||
player.play();
|
||||
audioControlButton.className = "ui primary red button"
|
||||
} else {
|
||||
player.pause();
|
||||
audioControlButton.className = "ui primary green button"
|
||||
}
|
||||
};
|
||||
|
||||
audioContainer.appendChild(player);
|
||||
audioContainer.appendChild(audioControlButton);
|
||||
|
||||
await startPlayer(player, track);
|
||||
player.pause()
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
function subscribeToTracks(participantId) {
|
||||
console.log(`subscribing to track`);
|
||||
|
||||
if (participantId === "local") {
|
||||
return;
|
||||
}
|
||||
|
||||
callObject.updateParticipant(participantId, {
|
||||
setSubscribedTracks: {
|
||||
audio: true,
|
||||
video: false,
|
||||
custom: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function startDaily() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
window.callObject = window.DailyIframe.createCallObject({});
|
||||
|
||||
callObject.on("participant-joined", (e) => {
|
||||
if (!e.participant.local) {
|
||||
console.log("participant-joined", e.participant);
|
||||
subscribeToTracks(e.participant.session_id);
|
||||
}
|
||||
});
|
||||
|
||||
callObject.on("participant-left", (e) => {
|
||||
console.log("participant-left", e.participant.session_id);
|
||||
destroyParticipantPlayers(e.participant.session_id);
|
||||
});
|
||||
|
||||
callObject.on("track-started", async (e) => {
|
||||
console.log("track-started", e.track);
|
||||
if (e.track.kind === "audio") {
|
||||
await buildAudioPlayer(e.track, e.participant.session_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function joinRoom() {
|
||||
enableJoinButton(false);
|
||||
enableLeaveButton(true);
|
||||
|
||||
const meetingUrl = document.getElementById("meeting-url").value;
|
||||
|
||||
callObject.join({
|
||||
url: meetingUrl,
|
||||
startVideoOff: true,
|
||||
startAudioOff: true,
|
||||
subscribeToTracksAutomatically: false,
|
||||
receiveSettings: {
|
||||
base: { video: { layer: 0 } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function leaveRoom() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
callObject.leave();
|
||||
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
audioContainer.replaceChildren();
|
||||
}
|
||||
</script>
|
||||
|
||||
<body onload="startDaily()">
|
||||
<div class="ui centered page grid" style="margin-top: 30px">
|
||||
<div class="ten wide column">
|
||||
<div class="ui form" style="margin-top: 30px">
|
||||
<div class="field">
|
||||
<label>Meeting URL</label>
|
||||
<input id="meeting-url" value="" />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="ui centered aligned header" style="margin-top: 30px">
|
||||
<button id="join-button" class="ui primary button" onclick="joinRoom()">
|
||||
Join
|
||||
</button>
|
||||
<button id="leave-button" class="ui button" onclick="leaveRoom()">
|
||||
Leave
|
||||
</button>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="audio-container"></div><br/>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
BIN
examples/daily-custom-tracks/office-ambience-mono-16000.mp3
Normal file
2
examples/daily-custom-tracks/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
pydub
|
||||
pipecat-ai[daily]
|
||||
55
examples/daily-custom-tracks/runner.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
15
examples/daily-multi-translation/Dockerfile
Normal file
@@ -0,0 +1,15 @@
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
RUN mkdir /app
|
||||
RUN mkdir /app/assets
|
||||
RUN mkdir /app/utils
|
||||
COPY *.py /app/
|
||||
COPY requirements.txt /app/
|
||||
|
||||
|
||||
WORKDIR /app
|
||||
RUN pip3 install -r requirements.txt
|
||||
|
||||
EXPOSE 7860
|
||||
|
||||
CMD ["python3", "server.py"]
|
||||
39
examples/daily-multi-translation/README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
# Daily Multi Translation
|
||||
|
||||
This example shows how to use Daily to stream multiple simultaneous translations using a single transport. Daily provides custom tracks and in this example we will simultaneously translate incoming audio in English to Spanish, French and German, each of them being sent to a custom track.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
|
||||
cp env.example .env # and add your credentials
|
||||
|
||||
```
|
||||
|
||||
## Run the server
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/` in your browser. This will open a Daily Prebuilt room where you will speak in English (make sure you are not muted).
|
||||
|
||||
## Open client
|
||||
|
||||
Next, you need to open the client that will listen to the translations.
|
||||
|
||||
```bash
|
||||
open index.html
|
||||
```
|
||||
|
||||
Once the client is opened, copy the URL of the Daily room created above and join it. You should be able to select which translation you want to hear.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
```
|
||||
docker build -t daily-multi-translation .
|
||||
docker run --env-file .env -p 7860:7860 daily-multi-translation
|
||||
```
|
||||
165
examples/daily-multi-translation/bot.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
BACKGROUND_SOUND_FILE = "office-ambience-mono-16000.mp3"
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Multi translation bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
audio_out_mixer={
|
||||
"spanish": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
"french": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
"german": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
},
|
||||
audio_out_destinations=["spanish", "french", "german"],
|
||||
microphone_out_enabled=False, # Disable since we just use custom tracks
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts_spanish = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="cefcb124-080b-4655-b31f-932f3ee743de",
|
||||
transport_destination="spanish",
|
||||
)
|
||||
tts_french = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="8832a0b5-47b2-4751-bb22-6a8e2149303d",
|
||||
transport_destination="french",
|
||||
)
|
||||
tts_german = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="38aabb6a-f52b-4fb0-a3d1-988518f4dc06",
|
||||
transport_destination="german",
|
||||
)
|
||||
|
||||
messages_spanish = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into Spanish.",
|
||||
},
|
||||
]
|
||||
messages_french = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into French.",
|
||||
},
|
||||
]
|
||||
messages_german = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into German.",
|
||||
},
|
||||
]
|
||||
|
||||
llm_spanish = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm_french = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm_german = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
context_spanish = OpenAILLMContext(messages_spanish)
|
||||
context_aggregator_spanish = llm_spanish.create_context_aggregator(context_spanish)
|
||||
|
||||
context_french = OpenAILLMContext(messages_french)
|
||||
context_aggregator_french = llm_french.create_context_aggregator(context_french)
|
||||
|
||||
context_german = OpenAILLMContext(messages_german)
|
||||
context_aggregator_german = llm_german.create_context_aggregator(context_german)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
ParallelPipeline(
|
||||
# Spanish pipeline.
|
||||
[
|
||||
context_aggregator_spanish.user(),
|
||||
llm_spanish,
|
||||
tts_spanish,
|
||||
context_aggregator_spanish.assistant(),
|
||||
],
|
||||
# French pipeline.
|
||||
[
|
||||
context_aggregator_french.user(),
|
||||
llm_french,
|
||||
tts_french,
|
||||
context_aggregator_french.assistant(),
|
||||
],
|
||||
# German pipeline.
|
||||
[
|
||||
context_aggregator_german.user(),
|
||||
llm_german,
|
||||
tts_german,
|
||||
context_aggregator_german.assistant(),
|
||||
],
|
||||
),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
observers=[TranscriptionLogObserver()],
|
||||
)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
5
examples/daily-multi-translation/env.example
Normal file
@@ -0,0 +1,5 @@
|
||||
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
|
||||
DAILY_API_KEY=7df...
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
DEEPGRAM_API_KEY=efb...
|
||||
CARTESIA_API_KEY=aeb...
|
||||
202
examples/daily-multi-translation/index.html
Normal file
@@ -0,0 +1,202 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>daily multi translation</title>
|
||||
</head>
|
||||
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
|
||||
<script
|
||||
src="https://code.jquery.com/jquery-3.1.1.min.js"
|
||||
integrity="sha256-hVVnYaiADRTO2PzUGmuLJr8BLUSjGIZsDYGmIJLv2b8="
|
||||
crossorigin="anonymous"
|
||||
></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
type="text/css"
|
||||
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
|
||||
/>
|
||||
<script>
|
||||
function enableButton(buttonId, enable) {
|
||||
const button = document.getElementById(buttonId);
|
||||
button.disabled = !enable;
|
||||
}
|
||||
|
||||
function enableJoinButton(enable) {
|
||||
enableButton("join-button", enable);
|
||||
}
|
||||
|
||||
function enableLeaveButton(enable) {
|
||||
enableButton("leave-button", enable);
|
||||
}
|
||||
|
||||
function destroyPlayers(query) {
|
||||
const items = document.querySelectorAll(query);
|
||||
if (items) {
|
||||
for (const item of items) {
|
||||
item.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function destroyParticipantPlayers(participantId) {
|
||||
destroyPlayers(`video[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`button[data-participant-id="${participantId}"]`);
|
||||
}
|
||||
|
||||
async function startPlayer(player, track) {
|
||||
player.muted = false;
|
||||
player.autoplay = true;
|
||||
if (track != null) {
|
||||
player.srcObject = new MediaStream([track]);
|
||||
}
|
||||
}
|
||||
|
||||
async function buildVideoPlayer(track, participantId) {
|
||||
const videoContainer = document.getElementById("video-container");
|
||||
const player = document.createElement("video");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
videoContainer.appendChild(player);
|
||||
|
||||
await startPlayer(player, track);
|
||||
await player.play();
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
async function buildAudioPlayer(track, participantId) {
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
const player = document.createElement("audio");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
// Create a new button for controlling audio
|
||||
const audioControlButton = document.createElement("button");
|
||||
audioControlButton.className = "ui primary green button"
|
||||
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
|
||||
audioControlButton.dataset.participantId = participantId;
|
||||
audioControlButton.onclick = () => {
|
||||
if (player.paused) {
|
||||
|
||||
player.play();
|
||||
audioControlButton.className = "ui primary red button"
|
||||
} else {
|
||||
player.pause();
|
||||
audioControlButton.className = "ui primary green button"
|
||||
}
|
||||
};
|
||||
|
||||
audioContainer.appendChild(player);
|
||||
audioContainer.appendChild(audioControlButton);
|
||||
|
||||
await startPlayer(player, track);
|
||||
player.pause()
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
function subscribeToTracks(participantId) {
|
||||
console.log(`subscribing to track`);
|
||||
|
||||
if (participantId === "local") {
|
||||
return;
|
||||
}
|
||||
|
||||
callObject.updateParticipant(participantId, {
|
||||
setSubscribedTracks: {
|
||||
audio: true,
|
||||
video: true,
|
||||
custom: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function startDaily() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
window.callObject = window.DailyIframe.createCallObject({});
|
||||
|
||||
callObject.on("participant-joined", (e) => {
|
||||
if (!e.participant.local) {
|
||||
console.log("participant-joined", e.participant);
|
||||
subscribeToTracks(e.participant.session_id);
|
||||
}
|
||||
});
|
||||
|
||||
callObject.on("participant-left", (e) => {
|
||||
console.log("participant-left", e.participant.session_id);
|
||||
destroyParticipantPlayers(e.participant.session_id);
|
||||
});
|
||||
|
||||
callObject.on("track-started", async (e) => {
|
||||
console.log("track-started", e.track);
|
||||
if (e.track.kind === "video") {
|
||||
await buildVideoPlayer(e.track, e.participant.session_id);
|
||||
} else if (e.track.kind === "audio") {
|
||||
await buildAudioPlayer(e.track, e.participant.session_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function joinRoom() {
|
||||
enableJoinButton(false);
|
||||
enableLeaveButton(true);
|
||||
|
||||
const meetingUrl = document.getElementById("meeting-url").value;
|
||||
|
||||
callObject.join({
|
||||
url: meetingUrl,
|
||||
startVideoOff: true,
|
||||
startAudioOff: true,
|
||||
subscribeToTracksAutomatically: false,
|
||||
receiveSettings: {
|
||||
base: { video: { layer: 0 } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function leaveRoom() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
callObject.leave();
|
||||
|
||||
const videoContainer = document.getElementById("video-container");
|
||||
videoContainer.replaceChildren();
|
||||
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
audioContainer.replaceChildren();
|
||||
}
|
||||
</script>
|
||||
|
||||
<body onload="startDaily()">
|
||||
<div class="ui centered page grid" style="margin-top: 30px">
|
||||
<div class="ten wide column">
|
||||
<div class="ui form" style="margin-top: 30px">
|
||||
<div class="field">
|
||||
<label>Meeting URL</label>
|
||||
<input id="meeting-url" value="" />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="ui centered aligned header" style="margin-top: 30px">
|
||||
<button id="join-button" class="ui primary button" onclick="joinRoom()">
|
||||
Join
|
||||
</button>
|
||||
<button id="leave-button" class="ui button" onclick="leaveRoom()">
|
||||
Leave
|
||||
</button>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="audio-container"></div><br/>
|
||||
</div>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="video-container" class="ui segment"></div>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
BIN
examples/daily-multi-translation/office-ambience-mono-16000.mp3
Normal file
5
examples/daily-multi-translation/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
aiofiles
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
pipecat-ai[daily,deepgram,openai,silero,cartesia]
|
||||
55
examples/daily-multi-translation/runner.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
139
examples/daily-multi-translation/server.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
bot_procs = {}
|
||||
|
||||
daily_helpers = {}
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def cleanup():
|
||||
# Clean up function, just to be extra safe
|
||||
for entry in bot_procs.values():
|
||||
proc = entry[0]
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
aiohttp_session = aiohttp.ClientSession()
|
||||
daily_helpers["rest"] = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
yield
|
||||
await aiohttp_session.close()
|
||||
cleanup()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
print(f"!!! Room URL: {room.url}")
|
||||
# Ensure the room property is present
|
||||
if not room.url:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
|
||||
)
|
||||
|
||||
# Check if there is already an existing process running in this room
|
||||
num_bots_in_room = sum(
|
||||
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
|
||||
)
|
||||
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
|
||||
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
|
||||
|
||||
# Get the token for the room
|
||||
token = await daily_helpers["rest"].get_token(room.url)
|
||||
|
||||
if not token:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)),
|
||||
)
|
||||
bot_procs[proc.pid] = (proc, room.url)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
return RedirectResponse(room.url)
|
||||
|
||||
|
||||
@app.get("/status/{pid}")
|
||||
def get_status(pid: int):
|
||||
# Look up the subprocess
|
||||
proc = bot_procs.get(pid)
|
||||
|
||||
# If the subprocess doesn't exist, return an error
|
||||
if not proc:
|
||||
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
|
||||
|
||||
# Check the status of the subprocess
|
||||
if proc[0].poll() is None:
|
||||
status = "running"
|
||||
else:
|
||||
status = "finished"
|
||||
|
||||
return JSONResponse({"bot_id": pid, "status": status})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
|
||||
parser.add_argument("--host", type=str, default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int, default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true", help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload,
|
||||
)
|
||||
@@ -14,6 +14,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.groq.llm import GroqLLMService
|
||||
from pipecat.services.groq.stt import GroqSTTService
|
||||
@@ -39,7 +40,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
|
||||
stt = GroqSTTService(api_key=os.getenv("GROQ_API_KEY"))
|
||||
|
||||
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
|
||||
llm = GroqLLMService(
|
||||
api_key=os.getenv("GROQ_API_KEY"), model="meta-llama/llama-4-maverick-17b-128e-instruct"
|
||||
)
|
||||
|
||||
tts = GroqTTSService(api_key=os.getenv("GROQ_API_KEY"))
|
||||
|
||||
@@ -51,7 +54,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
context_aggregator = llm.create_context_aggregator(
|
||||
context, user_params=LLMUserAggregatorParams(aggregation_timeout=0.05)
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
|
||||
@@ -16,8 +16,12 @@ from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.nim.llm import NimLLMService
|
||||
from pipecat.services.riva.stt import ParakeetSTTService
|
||||
from pipecat.services.riva.tts import FastPitchTTSService
|
||||
from pipecat.services.riva.stt import (
|
||||
ParakeetSTTService,
|
||||
RivaSegmentedSTTService,
|
||||
RivaSTTService,
|
||||
)
|
||||
from pipecat.services.riva.tts import FastPitchTTSService, RivaTTSService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
@@ -37,11 +41,11 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
),
|
||||
)
|
||||
|
||||
stt = ParakeetSTTService(api_key=os.getenv("NVIDIA_API_KEY"))
|
||||
stt = RivaSTTService(api_key=os.getenv("NVIDIA_API_KEY"))
|
||||
|
||||
llm = NimLLMService(api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.1-405b-instruct")
|
||||
|
||||
tts = FastPitchTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
|
||||
tts = RivaTTSService(api_key=os.getenv("NVIDIA_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -17,6 +17,7 @@ from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.groq.llm import GroqLLMService
|
||||
@@ -53,7 +54,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
|
||||
llm = GroqLLMService(
|
||||
api_key=os.getenv("GROQ_API_KEY"), model="meta-llama/llama-4-maverick-17b-128e-instruct"
|
||||
)
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function("get_current_weather", fetch_weather_from_api)
|
||||
@@ -83,7 +86,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
context_aggregator = llm.create_context_aggregator(
|
||||
context, user_params=LLMUserAggregatorParams(aggregation_timeout=0.05)
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
|
||||
@@ -12,10 +12,12 @@ from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import TranscriptionMessage
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
@@ -69,12 +71,16 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
transcript = TranscriptProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
transcript.user(),
|
||||
llm,
|
||||
transport.output(),
|
||||
transcript.assistant(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
@@ -103,6 +109,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
# Register event handler for transcript updates
|
||||
@transcript.event_handler("on_transcript_update")
|
||||
async def on_transcript_update(processor, frame):
|
||||
for msg in frame.messages:
|
||||
if isinstance(msg, TranscriptionMessage):
|
||||
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
|
||||
line = f"{timestamp}{msg.role}: {msg.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -36,6 +36,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_is_live=True,
|
||||
video_out_width=512,
|
||||
video_out_height=512,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
|
||||
128
examples/foundational/38b-smart-turn-local.py
Normal file
@@ -0,0 +1,128 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.smart_turn.local_smart_turn import LocalSmartTurnAnalyzer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
|
||||
# to the path where the smart-turn repo is cloned.
|
||||
#
|
||||
# Example setup:
|
||||
#
|
||||
# # Git LFS (Large File Storage)
|
||||
# brew install git-lfs
|
||||
# # Hugging Face uses LFS to store large model files, including .mlpackage
|
||||
# git lfs install
|
||||
# # Clone the repo with the smart_turn_classifier.mlpackage
|
||||
# git clone https://huggingface.co/pipecat-ai/smart-turn
|
||||
#
|
||||
# Then set the env variable:
|
||||
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
|
||||
# or add it to your .env file
|
||||
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
turn_analyzer=LocalSmartTurnAnalyzer(
|
||||
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
frontend/node_modules
|
||||
frontend/out
|
||||
@@ -1,4 +1,4 @@
|
||||
[](https://storytelling-chatbot.fly.dev)
|
||||
[](https://gemini-storybot.vercel.app/)
|
||||
|
||||
# Storytelling Chatbot
|
||||
|
||||
@@ -9,7 +9,6 @@ It periodically prompts the user for input for a 'choose your own adventure' sty
|
||||
|
||||
We use Gemini 2.0 for creating the story and image prompts, and we add visual elements to the story by generating images using Google's Imagen.
|
||||
|
||||
|
||||
---
|
||||
|
||||
### It uses the following AI services:
|
||||
@@ -20,7 +19,7 @@ Transcribes inbound participant voice media to text.
|
||||
|
||||
**Google Gemini 2.0 - LLM**
|
||||
|
||||
Our creative writer LLM. You can see the context used to prompt it [here](src/prompts.py)
|
||||
Our creative writer LLM. You can see the context used to prompt it [here](server/prompts.py)
|
||||
|
||||
**ElevenLabs - Text-to-Speech**
|
||||
|
||||
@@ -34,47 +33,76 @@ Adds pictures to our story. Prompting is quite key for style consistency, so we
|
||||
|
||||
## Setup
|
||||
|
||||
**Install requirements**
|
||||
### Client
|
||||
|
||||
```shell
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
1. Navigate to the client directory:
|
||||
|
||||
**Create environment file and set variables:**
|
||||
```shell
|
||||
cd client
|
||||
```
|
||||
|
||||
```shell
|
||||
mv env.example .env
|
||||
```
|
||||
2. Install dependencies:
|
||||
|
||||
When deploying to production, to ensure only this app can spawn a new bot, set your `ENV` to `production`
|
||||
```shell
|
||||
npm install
|
||||
```
|
||||
|
||||
**Build the frontend:**
|
||||
3. Build the client:
|
||||
|
||||
This project uses a custom frontend, which needs to built. Note: this is done automatically as part of the Docker deployment.
|
||||
```shell
|
||||
npm run build
|
||||
```
|
||||
|
||||
```shell
|
||||
cd frontend/
|
||||
npm install
|
||||
npm run build
|
||||
```
|
||||
### Server
|
||||
|
||||
The build UI files can be found in `frontend/out`
|
||||
1. Navigate to the server directory
|
||||
|
||||
## Running it locally
|
||||
```shell
|
||||
cd ../server
|
||||
```
|
||||
|
||||
Start the API / bot manager:
|
||||
2. Set up your virtual environment and install requirements
|
||||
|
||||
`python src/bot_runner.py --host localhost`
|
||||
```shell
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
If you'd like to run a custom domain or port:
|
||||
3. Create environment file and set variables
|
||||
|
||||
`python src/bot_runner.py --host somehost --p someport`
|
||||
```shell
|
||||
mv env.example .env
|
||||
```
|
||||
|
||||
➡️ Open the host URL in your browser `http://localhost:7860`
|
||||
You'll need API keys for:
|
||||
|
||||
If you've run previous versions of the demo, make sure to set `ENV=dev`, and remove the `RUN_AS_VM` line from the .env file.
|
||||
- DAILY_API_KEY
|
||||
- ELEVENLABS_API_KEY
|
||||
- ELEVENLABS_VOICE_ID
|
||||
- GOOGLE_API_KEY
|
||||
|
||||
4. (Optional) Deployment:
|
||||
|
||||
When deploying to production, to ensure only this app can spawn new bot processes, set your `ENV` to `production`
|
||||
|
||||
## Run it locally
|
||||
|
||||
1. Navigate back to the demo's root directory:
|
||||
|
||||
```shell
|
||||
cd ..
|
||||
```
|
||||
|
||||
2. Run the application:
|
||||
|
||||
```shell
|
||||
python server/bot_runner.py --host localhost
|
||||
```
|
||||
|
||||
You can run with a custom domain or port using: `python server/bot_runner.py --host somehost --p someport`
|
||||
|
||||
3. ➡️ Open the host URL in your browser: http://localhost:7860
|
||||
|
||||
---
|
||||
|
||||
|
||||
|
Before Width: | Height: | Size: 1.1 KiB After Width: | Height: | Size: 1.1 KiB |
|
Before Width: | Height: | Size: 1.3 MiB After Width: | Height: | Size: 1.3 MiB |
|
Before Width: | Height: | Size: 2.4 MiB After Width: | Height: | Size: 2.4 MiB |
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"name": "frontend",
|
||||
"name": "client",
|
||||
"version": "0.1.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "frontend",
|
||||
"name": "client",
|
||||
"version": "0.1.0",
|
||||
"dependencies": {
|
||||
"@daily-co/daily-js": "^0.62.0",
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"name": "frontend",
|
||||
"name": "client",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
|
Before Width: | Height: | Size: 3.7 KiB After Width: | Height: | Size: 3.7 KiB |
|
Before Width: | Height: | Size: 788 KiB After Width: | Height: | Size: 788 KiB |
2
examples/storytelling-chatbot/server/.dockerignore
Normal file
@@ -0,0 +1,2 @@
|
||||
client/node_modules
|
||||
client/out
|
||||
@@ -44,11 +44,11 @@ COPY ./requirements.txt requirements.txt
|
||||
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
|
||||
|
||||
# Copy everything else
|
||||
COPY --chown=user ./src/ src/
|
||||
COPY --chown=user ./server/ server/
|
||||
|
||||
# Copy frontend app and build
|
||||
COPY --chown=user ./frontend/ frontend/
|
||||
RUN cd frontend && npm install && npm run build
|
||||
# Copy client app and build
|
||||
COPY --chown=user ./client/ client/
|
||||
RUN cd client && npm install && npm run build
|
||||
|
||||
# Start the FastAPI server
|
||||
CMD python3 src/bot_runner.py --port ${FAST_API_PORT}
|
||||
CMD python3 server/bot_runner.py --port ${FAST_API_PORT}
|
||||
|
Before Width: | Height: | Size: 1.4 MiB After Width: | Height: | Size: 1.4 MiB |
|
Before Width: | Height: | Size: 1.5 MiB After Width: | Height: | Size: 1.5 MiB |
@@ -57,7 +57,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
# Mount the static directory
|
||||
STATIC_DIR = "frontend/out"
|
||||
STATIC_DIR = "client/out"
|
||||
|
||||
|
||||
# ------------ Fast API Routes ------------ #
|
||||
@@ -175,7 +175,7 @@ async def virtualize_bot(room_url: str, token: str):
|
||||
image = data[0]["config"]["image"]
|
||||
|
||||
# Machine configuration
|
||||
cmd = f"python src/bot.py -u {room_url} -t {token}"
|
||||
cmd = f"python server/bot.py -u {room_url} -t {token}"
|
||||
cmd = cmd.split()
|
||||
worker_props = {
|
||||
"config": {
|
||||
@@ -47,7 +47,7 @@ canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.17.0" ]
|
||||
daily = [ "daily-python~=0.18.1" ]
|
||||
deepgram = [ "deepgram-sdk~=3.8.0" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
fal = [ "fal-client~=0.5.9" ]
|
||||
@@ -56,7 +56,7 @@ fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
|
||||
gladia = [ "websockets~=13.1" ]
|
||||
google = [ "google-cloud-speech~=2.31.1", "google-cloud-texttospeech~=2.25.1", "google-genai~=1.7.0", "google-generativeai~=0.8.4", "websockets~=13.1" ]
|
||||
grok = []
|
||||
groq = [ "groq~=0.20.0" ]
|
||||
groq = [ "groq~=0.23.0" ]
|
||||
gstreamer = [ "pygobject~=3.50.0" ]
|
||||
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
|
||||
koala = [ "pvkoala~=2.0.3" ]
|
||||
@@ -78,7 +78,7 @@ perplexity = []
|
||||
playht = [ "pyht~=0.1.12", "websockets~=13.1" ]
|
||||
qwen = []
|
||||
rime = [ "websockets~=13.1" ]
|
||||
riva = [ "nvidia-riva-client~=2.19.0" ]
|
||||
riva = [ "nvidia-riva-client~=2.19.1" ]
|
||||
sentry = [ "sentry-sdk~=2.23.1" ]
|
||||
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
|
||||
remote-smart-turn = []
|
||||
|
||||
73
src/pipecat/audio/turn/smart_turn/local_smart_turn.py
Normal file
@@ -0,0 +1,73 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn.base_smart_turn import BaseSmartTurn
|
||||
|
||||
try:
|
||||
import torch
|
||||
from transformers import AutoFeatureExtractor, Wav2Vec2BertForSequenceClassification
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LocalSmartTurnAnalyzer(BaseSmartTurn):
|
||||
def __init__(self, *, smart_turn_model_path: str, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if not smart_turn_model_path:
|
||||
# Define the path to the pretrained model on Hugging Face
|
||||
smart_turn_model_path = "pipecat-ai/smart-turn"
|
||||
|
||||
logger.debug("Loading Local Smart Turn model...")
|
||||
# Load the pretrained model for sequence classification
|
||||
self._turn_model = Wav2Vec2BertForSequenceClassification.from_pretrained(
|
||||
smart_turn_model_path
|
||||
)
|
||||
# Load the corresponding feature extractor for preprocessing audio
|
||||
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
|
||||
# Set device to GPU if available, else CPU
|
||||
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
# Move model to selected device and set it to evaluation mode
|
||||
self._turn_model = self._turn_model.to(self._device)
|
||||
self._turn_model.eval()
|
||||
logger.debug("Loaded Local Smart Turn")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
sampling_rate=16000,
|
||||
padding="max_length",
|
||||
truncation=True,
|
||||
max_length=800, # Maximum length as specified in training
|
||||
return_attention_mask=True,
|
||||
return_tensors="pt",
|
||||
)
|
||||
|
||||
# Move input tensors to the same device as the model
|
||||
inputs = {k: v.to(self._device) for k, v in inputs.items()}
|
||||
|
||||
# Disable gradient calculation for inference
|
||||
with torch.no_grad():
|
||||
outputs = self._turn_model(**inputs)
|
||||
logits = outputs.logits
|
||||
probabilities = torch.nn.functional.softmax(logits, dim=1)
|
||||
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
|
||||
prediction = 1 if completion_prob > 0.5 else 0
|
||||
|
||||
return {
|
||||
"prediction": prediction,
|
||||
"probability": completion_prob,
|
||||
}
|
||||
@@ -60,12 +60,16 @@ class Frame:
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
metadata: Dict[str, Any] = field(init=False)
|
||||
transport_source: Optional[str] = field(init=False)
|
||||
transport_destination: Optional[str] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
self.metadata: Dict[str, Any] = {}
|
||||
self.transport_source: Optional[str] = None
|
||||
self.transport_destination: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -136,8 +140,9 @@ class ImageRawFrame:
|
||||
|
||||
@dataclass
|
||||
class OutputAudioRawFrame(DataFrame, AudioRawFrame):
|
||||
"""A chunk of audio. Will be played by the output transport if the
|
||||
transport's microphone has been enabled.
|
||||
"""A chunk of audio. Will be played by the output transport. If the
|
||||
transport supports multiple audio destinations (e.g. multiple audio tracks) the
|
||||
destination name can be specified.
|
||||
|
||||
"""
|
||||
|
||||
@@ -147,13 +152,14 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame):
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputImageRawFrame(DataFrame, ImageRawFrame):
|
||||
"""An image that will be shown by the transport if the transport's camera is
|
||||
enabled.
|
||||
"""An image that will be shown by the transport. If the transport supports
|
||||
multiple video destinations (e.g. multiple video tracks) the destination
|
||||
name can be specified.
|
||||
|
||||
"""
|
||||
|
||||
@@ -176,7 +182,7 @@ class URLImageRawFrame(OutputImageRawFrame):
|
||||
|
||||
"""
|
||||
|
||||
url: Optional[str]
|
||||
url: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
@@ -716,7 +722,11 @@ class UserImageRequestFrame(SystemFrame):
|
||||
|
||||
@dataclass
|
||||
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
"""A chunk of audio usually coming from an input transport."""
|
||||
"""A chunk of audio usually coming from an input transport. If the transport
|
||||
supports multiple audio sources (e.g. multiple audio tracks) the source name
|
||||
will be specified.
|
||||
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
@@ -724,35 +734,50 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputImageRawFrame(SystemFrame, ImageRawFrame):
|
||||
"""An image usually coming from an input transport."""
|
||||
"""An image usually coming from an input transport. If the transport
|
||||
supports multiple video sources (e.g. multiple video tracks) the source name
|
||||
will be specified.
|
||||
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
|
||||
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserAudioRawFrame(InputAudioRawFrame):
|
||||
"""A chunk of audio, usually coming from an input transport, associated to a user."""
|
||||
|
||||
user_id: str = ""
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserImageRawFrame(InputImageRawFrame):
|
||||
"""An image associated to a user."""
|
||||
|
||||
user_id: str
|
||||
user_id: str = ""
|
||||
request: Optional[UserImageRequestFrame] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format}, request: {self.request})"
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VisionImageRawFrame(InputImageRawFrame):
|
||||
"""An image with an associated text to ask for a description of it."""
|
||||
|
||||
text: Optional[str]
|
||||
text: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
|
||||
@@ -227,10 +227,8 @@ class GeminiMultimodalLiveAssistantContextAggregator(OpenAIAssistantContextAggre
|
||||
# but the GeminiMultimodalLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We
|
||||
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
|
||||
# are process. This ensures that the context gets only one set of messages.
|
||||
# GeminiMultimodalLiveLLMService also pushes TranscriptionFrames, so we need to
|
||||
# ignore pushing those as well, as they're also TextFrames.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame)):
|
||||
if not isinstance(frame, LLMTextFrame):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_user_image_frame(self, frame: UserImageRawFrame):
|
||||
@@ -354,6 +352,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_is_speaking = False
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
self._sample_rate = 24000
|
||||
|
||||
@@ -464,9 +463,9 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
# Sometimes the transcription contains newlines; we want to remove them.
|
||||
cleaned_text = text.rstrip("\n")
|
||||
logger.debug(f"[Transcription:user] {cleaned_text}")
|
||||
context.add_message({"role": "user", "content": [{"type": "text", "text": cleaned_text}]})
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601())
|
||||
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601()),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
|
||||
async def _transcribe_audio(self, audio, context):
|
||||
@@ -852,6 +851,15 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
if not part:
|
||||
return
|
||||
|
||||
# part.text is added when `modalities` is set to TEXT; otherwise, it's None
|
||||
text = part.text
|
||||
if text:
|
||||
if not self._bot_text_buffer:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
self._bot_text_buffer += text
|
||||
await self.push_frame(LLMTextFrame(text=text))
|
||||
|
||||
inline_data = part.inlineData
|
||||
if not inline_data:
|
||||
return
|
||||
@@ -892,13 +900,24 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_turn_complete(self, evt):
|
||||
self._bot_is_speaking = False
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
text = self._bot_text_buffer
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
# Only push the TTSStoppedFrame the bot is outputting audio
|
||||
# when text is found, modalities is set to TEXT and no audio
|
||||
# is produced.
|
||||
if not text:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
async def _handle_evt_output_transcription(self, evt):
|
||||
if not evt.serverContent.outputTranscription:
|
||||
return
|
||||
|
||||
# This is the output transcription text when modalities is set to AUDIO.
|
||||
# In this case, we push LLMTextFrame and TTSTextFrame to be handled by the
|
||||
# downstream assistant context aggregator.
|
||||
text = evt.serverContent.outputTranscription.text
|
||||
|
||||
if not text:
|
||||
|
||||
@@ -12,9 +12,11 @@ from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -137,15 +139,6 @@ class OpenAIRealtimeLLMContext(OpenAILLMContext):
|
||||
}
|
||||
self.add_message(message)
|
||||
|
||||
def add_assistant_content_item_as_message(self, item):
|
||||
message = {"role": "assistant", "content": []}
|
||||
for content in item.content:
|
||||
if content.type == "audio":
|
||||
message["content"].append({"type": "text", "text": content.transcript})
|
||||
else:
|
||||
logger.error(f"Unhandled content type in assistant item: {content.type} - {item}")
|
||||
self.add_message(message)
|
||||
|
||||
|
||||
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
|
||||
async def process_frame(
|
||||
@@ -175,8 +168,10 @@ class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator)
|
||||
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
|
||||
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
|
||||
# are process. This ensures that the context gets only one set of messages.
|
||||
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
|
||||
# so we need to ignore pushing those as well, as they're also TextFrames.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if not isinstance(frame, LLMTextFrame):
|
||||
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
|
||||
|
||||
@@ -562,13 +562,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
|
||||
|
||||
async def _handle_assistant_output(self, output):
|
||||
# logger.debug(f"!!! HANDLE Assistant output: {output}")
|
||||
# We haven't seen intermixed audio and function_call items in the same response. But let's
|
||||
# try to write logic that handles that, if it does happen.
|
||||
messages = [item for item in output if item.type == "message"]
|
||||
# Also, the assistant output is pushed as LLMTextFrame and TTSTextFrame to be handled by
|
||||
# the assistant context aggregator.
|
||||
function_calls = [item for item in output if item.type == "function_call"]
|
||||
for item in messages:
|
||||
self._context.add_assistant_content_item_as_message(item)
|
||||
await self._handle_function_call_items(function_calls)
|
||||
|
||||
async def _handle_function_call_items(self, items):
|
||||
|
||||
@@ -68,6 +68,8 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
language: Optional[Language] = Language.EN
|
||||
speed_alpha: Optional[float] = 1.0
|
||||
reduce_latency: Optional[bool] = False
|
||||
pause_between_brackets: Optional[bool] = False
|
||||
phonemize_between_brackets: Optional[bool] = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -117,6 +119,8 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
else "eng",
|
||||
"speedAlpha": params.speed_alpha,
|
||||
"reduceLatency": params.reduce_latency,
|
||||
"pauseBetweenBrackets": json.dumps(params.pause_between_brackets),
|
||||
"phonemizeBetweenBrackets": json.dumps(params.phonemize_between_brackets),
|
||||
}
|
||||
|
||||
# State tracking
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator, Optional
|
||||
from typing import AsyncGenerator, List, Mapping, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -13,12 +13,13 @@ from pydantic import BaseModel
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.services.stt_service import STTService
|
||||
from pipecat.services.stt_service import SegmentedSTTService, STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
@@ -31,7 +32,59 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class ParakeetSTTService(STTService):
|
||||
def language_to_riva_language(language: Language) -> Optional[str]:
|
||||
"""Maps Language enum to Riva ASR language codes.
|
||||
|
||||
Source:
|
||||
https://docs.nvidia.com/deeplearning/riva/user-guide/docs/asr/asr-riva-build-table.html?highlight=fr%20fr
|
||||
|
||||
Args:
|
||||
language: Language enum value.
|
||||
|
||||
Returns:
|
||||
Optional[str]: Riva language code or None if not supported.
|
||||
"""
|
||||
language_map = {
|
||||
# Arabic
|
||||
Language.AR: "ar-AR",
|
||||
# English
|
||||
Language.EN: "en-US", # Default to US
|
||||
Language.EN_US: "en-US",
|
||||
Language.EN_GB: "en-GB",
|
||||
# French
|
||||
Language.FR: "fr-FR",
|
||||
Language.FR_FR: "fr-FR",
|
||||
# German
|
||||
Language.DE: "de-DE",
|
||||
Language.DE_DE: "de-DE",
|
||||
# Hindi
|
||||
Language.HI: "hi-IN",
|
||||
Language.HI_IN: "hi-IN",
|
||||
# Italian
|
||||
Language.IT: "it-IT",
|
||||
Language.IT_IT: "it-IT",
|
||||
# Japanese
|
||||
Language.JA: "ja-JP",
|
||||
Language.JA_JP: "ja-JP",
|
||||
# Korean
|
||||
Language.KO: "ko-KR",
|
||||
Language.KO_KR: "ko-KR",
|
||||
# Portuguese
|
||||
Language.PT: "pt-BR", # Default to Brazilian
|
||||
Language.PT_BR: "pt-BR",
|
||||
# Russian
|
||||
Language.RU: "ru-RU",
|
||||
Language.RU_RU: "ru-RU",
|
||||
# Spanish
|
||||
Language.ES: "es-ES", # Default to Spain
|
||||
Language.ES_ES: "es-ES",
|
||||
Language.ES_US: "es-US", # US Spanish
|
||||
}
|
||||
|
||||
return language_map.get(language)
|
||||
|
||||
|
||||
class RivaSTTService(STTService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN_US
|
||||
|
||||
@@ -40,7 +93,10 @@ class ParakeetSTTService(STTService):
|
||||
*,
|
||||
api_key: str,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
function_id: str = "1598d209-5e27-4d3c-8079-4751568b1081",
|
||||
model_function_map: Mapping[str, str] = {
|
||||
"function_id": "1598d209-5e27-4d3c-8079-4751568b1081",
|
||||
"model_name": "parakeet-ctc-1.1b-asr",
|
||||
},
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -48,7 +104,7 @@ class ParakeetSTTService(STTService):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
self._api_key = api_key
|
||||
self._profanity_filter = False
|
||||
self._automatic_punctuation = False
|
||||
self._automatic_punctuation = True
|
||||
self._no_verbatim_transcripts = False
|
||||
self._language_code = params.language
|
||||
self._boosted_lm_words = None
|
||||
@@ -60,11 +116,12 @@ class ParakeetSTTService(STTService):
|
||||
self._stop_history_eou = -1
|
||||
self._stop_threshold_eou = -1.0
|
||||
self._custom_configuration = ""
|
||||
self._function_id = model_function_map.get("function_id")
|
||||
|
||||
self.set_model_name("parakeet-ctc-1.1b-asr")
|
||||
self.set_model_name(model_function_map.get("model_name"))
|
||||
|
||||
metadata = [
|
||||
["function-id", function_id],
|
||||
["function-id", self._function_id],
|
||||
["authorization", f"Bearer {api_key}"],
|
||||
]
|
||||
auth = riva.client.Auth(None, True, server, metadata)
|
||||
@@ -79,6 +136,13 @@ class ParakeetSTTService(STTService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return False
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
|
||||
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
|
||||
logger.warning(
|
||||
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
|
||||
@@ -196,3 +260,262 @@ class ParakeetSTTService(STTService):
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
|
||||
class RivaSegmentedSTTService(SegmentedSTTService):
|
||||
"""Speech-to-text service using NVIDIA Riva's offline/batch models.
|
||||
|
||||
By default, his service uses NVIDIA's Riva Canary ASR API to perform speech-to-text
|
||||
transcription on audio segments. It inherits from SegmentedSTTService to handle
|
||||
audio buffering and speech detection.
|
||||
|
||||
Args:
|
||||
api_key: NVIDIA API key for authentication
|
||||
server: Riva server address (defaults to NVIDIA Cloud Function endpoint)
|
||||
model_function_map: Mapping of model name and its corresponding NVIDIA Cloud Function ID
|
||||
sample_rate: Audio sample rate in Hz. If not provided, uses the pipeline's rate
|
||||
params: Additional configuration parameters for Riva
|
||||
**kwargs: Additional arguments passed to SegmentedSTTService
|
||||
"""
|
||||
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN_US
|
||||
profanity_filter: bool = False
|
||||
automatic_punctuation: bool = True
|
||||
verbatim_transcripts: bool = False
|
||||
boosted_lm_words: Optional[List[str]] = None
|
||||
boosted_lm_score: float = 4.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
model_function_map: Mapping[str, str] = {
|
||||
"function_id": "ee8dc628-76de-4acc-8595-1836e7e857bd",
|
||||
"model_name": "canary-1b-asr",
|
||||
},
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
# Set model name
|
||||
self.set_model_name(model_function_map.get("model_name"))
|
||||
|
||||
# Initialize Riva settings
|
||||
self._api_key = api_key
|
||||
self._server = server
|
||||
self._function_id = model_function_map.get("function_id")
|
||||
self._model_name = model_function_map.get("model_name")
|
||||
|
||||
# Store the language as a Language enum and as a string
|
||||
self._language_enum = params.language or Language.EN_US
|
||||
self._language = self.language_to_service_language(self._language_enum) or "en-US"
|
||||
|
||||
# Configure transcription parameters
|
||||
self._profanity_filter = params.profanity_filter
|
||||
self._automatic_punctuation = params.automatic_punctuation
|
||||
self._verbatim_transcripts = params.verbatim_transcripts
|
||||
self._boosted_lm_words = params.boosted_lm_words
|
||||
self._boosted_lm_score = params.boosted_lm_score
|
||||
|
||||
# Voice activity detection thresholds (use Riva defaults)
|
||||
self._start_history = -1
|
||||
self._start_threshold = -1.0
|
||||
self._stop_history = -1
|
||||
self._stop_threshold = -1.0
|
||||
self._stop_history_eou = -1
|
||||
self._stop_threshold_eou = -1.0
|
||||
self._custom_configuration = ""
|
||||
|
||||
# Create Riva client
|
||||
self._config = None
|
||||
self._asr_service = None
|
||||
self._settings = {"language": self._language_enum}
|
||||
|
||||
def language_to_service_language(self, language: Language) -> Optional[str]:
|
||||
"""Convert pipecat Language enum to Riva's language code."""
|
||||
return language_to_riva_language(language)
|
||||
|
||||
def _initialize_client(self):
|
||||
"""Initialize the Riva ASR client with authentication metadata."""
|
||||
if self._asr_service is not None:
|
||||
return
|
||||
|
||||
# Set up authentication metadata for NVIDIA Cloud Functions
|
||||
metadata = [
|
||||
["function-id", self._function_id],
|
||||
["authorization", f"Bearer {self._api_key}"],
|
||||
]
|
||||
|
||||
# Create authenticated client
|
||||
auth = riva.client.Auth(None, True, self._server, metadata)
|
||||
self._asr_service = riva.client.ASRService(auth)
|
||||
|
||||
logger.info(f"Initialized RivaSegmentedSTTService with model: {self.model_name}")
|
||||
|
||||
def _create_recognition_config(self):
|
||||
"""Create the Riva ASR recognition configuration."""
|
||||
# Create base configuration
|
||||
config = riva.client.RecognitionConfig(
|
||||
language_code=self._language, # Now using the string, not a tuple
|
||||
max_alternatives=1,
|
||||
profanity_filter=self._profanity_filter,
|
||||
enable_automatic_punctuation=self._automatic_punctuation,
|
||||
verbatim_transcripts=self._verbatim_transcripts,
|
||||
)
|
||||
|
||||
# Add word boosting if specified
|
||||
if self._boosted_lm_words:
|
||||
riva.client.add_word_boosting_to_config(
|
||||
config, self._boosted_lm_words, self._boosted_lm_score
|
||||
)
|
||||
|
||||
# Add voice activity detection parameters
|
||||
riva.client.add_endpoint_parameters_to_config(
|
||||
config,
|
||||
self._start_history,
|
||||
self._start_threshold,
|
||||
self._stop_history,
|
||||
self._stop_history_eou,
|
||||
self._stop_threshold,
|
||||
self._stop_threshold_eou,
|
||||
)
|
||||
|
||||
# Add any custom configuration
|
||||
if self._custom_configuration:
|
||||
riva.client.add_custom_configuration_to_config(config, self._custom_configuration)
|
||||
|
||||
return config
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
"""Indicates whether this service can generate processing metrics."""
|
||||
return True
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
|
||||
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
|
||||
logger.warning(
|
||||
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
|
||||
)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
"""Initialize the service when the pipeline starts."""
|
||||
await super().start(frame)
|
||||
self._initialize_client()
|
||||
self._config = self._create_recognition_config()
|
||||
|
||||
async def set_language(self, language: Language):
|
||||
"""Set the language for the STT service."""
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._language_enum = language
|
||||
self._language = self.language_to_service_language(language) or "en-US"
|
||||
self._settings["language"] = language
|
||||
|
||||
# Update configuration with new language
|
||||
if self._config:
|
||||
self._config.language_code = self._language
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""Transcribe an audio segment.
|
||||
|
||||
Args:
|
||||
audio: Raw audio bytes in WAV format (already converted by base class).
|
||||
|
||||
Yields:
|
||||
Frame: TranscriptionFrame containing the transcribed text.
|
||||
"""
|
||||
try:
|
||||
await self.start_processing_metrics()
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
# Make sure the client is initialized
|
||||
if self._asr_service is None:
|
||||
self._initialize_client()
|
||||
|
||||
# Make sure the config is created
|
||||
if self._config is None:
|
||||
self._config = self._create_recognition_config()
|
||||
|
||||
# Type assertion to satisfy the IDE
|
||||
assert self._asr_service is not None, "ASR service not initialized"
|
||||
assert self._config is not None, "Recognition config not created"
|
||||
|
||||
# Process audio with Riva ASR - explicitly request non-future response
|
||||
raw_response = self._asr_service.offline_recognize(audio, self._config, future=False)
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
# Process the response - handle different possible return types
|
||||
try:
|
||||
# If it's a future-like object, get the result
|
||||
if hasattr(raw_response, "result"):
|
||||
response = raw_response.result()
|
||||
else:
|
||||
response = raw_response
|
||||
|
||||
# Process transcription results
|
||||
transcription_found = False
|
||||
|
||||
# Now we can safely check results
|
||||
# Type hint for the IDE
|
||||
results = getattr(response, "results", [])
|
||||
|
||||
for result in results:
|
||||
alternatives = getattr(result, "alternatives", [])
|
||||
if alternatives:
|
||||
text = alternatives[0].transcript.strip()
|
||||
if text:
|
||||
logger.debug(f"Transcription: [{text}]")
|
||||
yield TranscriptionFrame(
|
||||
text, "", time_now_iso8601(), self._language_enum
|
||||
)
|
||||
transcription_found = True
|
||||
|
||||
if not transcription_found:
|
||||
logger.debug("No transcription results found in Riva response")
|
||||
|
||||
except AttributeError as ae:
|
||||
logger.error(f"Unexpected response structure from Riva: {ae}")
|
||||
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Riva Canary ASR error: {e}")
|
||||
yield ErrorFrame(f"Riva Canary ASR error: {str(e)}")
|
||||
|
||||
|
||||
class ParakeetSTTService(RivaSTTService):
|
||||
"""Deprecated: Use RivaSTTService instead."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
model_function_map: Mapping[str, str] = {
|
||||
"function_id": "1598d209-5e27-4d3c-8079-4751568b1081",
|
||||
"model_name": "parakeet-ctc-1.1b-asr",
|
||||
},
|
||||
sample_rate: Optional[int] = None,
|
||||
params: RivaSTTService.InputParams = RivaSTTService.InputParams(), # Use parent class's type
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
api_key=api_key,
|
||||
server=server,
|
||||
model_function_map=model_function_map,
|
||||
sample_rate=sample_rate,
|
||||
params=params,
|
||||
**kwargs,
|
||||
)
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"`ParakeetSTTService` is deprecated, use `RivaSTTService` instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
@@ -5,7 +5,11 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator, Optional
|
||||
import os
|
||||
from typing import AsyncGenerator, Mapping, Optional
|
||||
|
||||
# Suppress gRPC fork warnings
|
||||
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
@@ -27,10 +31,10 @@ except ModuleNotFoundError as e:
|
||||
logger.error("In order to use NVIDIA Riva TTS, you need to `pip install pipecat-ai[riva]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
FASTPITCH_TIMEOUT_SECS = 5
|
||||
RIVA_TTS_TIMEOUT_SECS = 5
|
||||
|
||||
|
||||
class FastPitchTTSService(TTSService):
|
||||
class RivaTTSService(TTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN_US
|
||||
quality: Optional[int] = 20
|
||||
@@ -38,11 +42,14 @@ class FastPitchTTSService(TTSService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
api_key: str = None,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
voice_id: str = "English-US.Female-1",
|
||||
voice_id: str = "Magpie-Multilingual.EN-US.Ray",
|
||||
sample_rate: Optional[int] = None,
|
||||
function_id: str = "0149dedb-2be8-4195-b9a0-e57e0e14f972",
|
||||
model_function_map: Mapping[str, str] = {
|
||||
"function_id": "877104f7-e885-42b9-8de8-f6e4c6303969",
|
||||
"model_name": "magpie-tts-multilingual",
|
||||
},
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
@@ -51,12 +58,13 @@ class FastPitchTTSService(TTSService):
|
||||
self._voice_id = voice_id
|
||||
self._language_code = params.language
|
||||
self._quality = params.quality
|
||||
self._function_id = model_function_map.get("function_id")
|
||||
|
||||
self.set_model_name("fastpitch-hifigan-tts")
|
||||
self.set_model_name(model_function_map.get("model_name"))
|
||||
self.set_voice(voice_id)
|
||||
|
||||
metadata = [
|
||||
["function-id", function_id],
|
||||
["function-id", self._function_id],
|
||||
["authorization", f"Bearer {api_key}"],
|
||||
]
|
||||
auth = riva.client.Auth(None, True, server, metadata)
|
||||
@@ -68,6 +76,13 @@ class FastPitchTTSService(TTSService):
|
||||
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
|
||||
)
|
||||
|
||||
async def set_model(self, model: str):
|
||||
logger.warning(f"Cannot set model after initialization. Set model and function id like so:")
|
||||
example = {"function_id": "<UUID>", "model_name": "<model_name>"}
|
||||
logger.warning(
|
||||
f"{self.__class__.__name__}(api_key=<api_key>, model_function_map={example})"
|
||||
)
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
def read_audio_responses(queue: asyncio.Queue):
|
||||
def add_response(r):
|
||||
@@ -79,8 +94,8 @@ class FastPitchTTSService(TTSService):
|
||||
self._voice_id,
|
||||
self._language_code,
|
||||
sample_rate_hz=self.sample_rate,
|
||||
audio_prompt_file=None,
|
||||
quality=self._quality,
|
||||
zero_shot_audio_prompt_file=None,
|
||||
zero_shot_quality=self._quality,
|
||||
custom_dictionary={},
|
||||
)
|
||||
for r in responses:
|
||||
@@ -100,7 +115,7 @@ class FastPitchTTSService(TTSService):
|
||||
await asyncio.to_thread(read_audio_responses, queue)
|
||||
|
||||
# Wait for the thread to start.
|
||||
resp = await asyncio.wait_for(queue.get(), FASTPITCH_TIMEOUT_SECS)
|
||||
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
|
||||
while resp:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(
|
||||
@@ -109,9 +124,46 @@ class FastPitchTTSService(TTSService):
|
||||
num_channels=1,
|
||||
)
|
||||
yield frame
|
||||
resp = await asyncio.wait_for(queue.get(), FASTPITCH_TIMEOUT_SECS)
|
||||
resp = await asyncio.wait_for(queue.get(), RIVA_TTS_TIMEOUT_SECS)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"{self} timeout waiting for audio response")
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
|
||||
class FastPitchTTSService(RivaTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN_US
|
||||
quality: Optional[int] = 20
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str = None,
|
||||
server: str = "grpc.nvcf.nvidia.com:443",
|
||||
voice_id: str = "English-US.Female-1",
|
||||
sample_rate: Optional[int] = None,
|
||||
model_function_map: Mapping[str, str] = {
|
||||
"function_id": "0149dedb-2be8-4195-b9a0-e57e0e14f972",
|
||||
"model_name": "fastpitch-hifigan-tts",
|
||||
},
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
api_key=api_key,
|
||||
voice_id=voice_id,
|
||||
sample_rate=sample_rate,
|
||||
model_function_map=model_function_map,
|
||||
params=params,
|
||||
**kwargs,
|
||||
)
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"`FastPitchTTSService` is deprecated, use `RivaTTSService` instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
@@ -64,13 +64,16 @@ class SimliVideoService(FrameProcessor):
|
||||
async for audio_frame in self._simli_client.getAudioStreamIterator():
|
||||
resampled_frames = self._pipecat_resampler.resample(audio_frame)
|
||||
for resampled_frame in resampled_frames:
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=resampled_frame.to_ndarray().tobytes(),
|
||||
sample_rate=self._pipecat_resampler.rate,
|
||||
num_channels=1,
|
||||
),
|
||||
)
|
||||
audio_array = resampled_frame.to_ndarray()
|
||||
# Only push frame is there is audio (e.g. not silence)
|
||||
if audio_array.any():
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=audio_array.tobytes(),
|
||||
sample_rate=self._pipecat_resampler.rate,
|
||||
num_channels=1,
|
||||
),
|
||||
)
|
||||
|
||||
async def _consume_and_process_video(self):
|
||||
await self._pipecat_resampler_event.wait()
|
||||
|
||||
@@ -66,6 +66,8 @@ class TTSService(AIService):
|
||||
# Text filter executed after text has been aggregated.
|
||||
text_filters: Sequence[BaseTextFilter] = [],
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
# Audio transport destination of the generated frames.
|
||||
transport_destination: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
@@ -82,6 +84,8 @@ class TTSService(AIService):
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
|
||||
self._text_filters: Sequence[BaseTextFilter] = text_filters
|
||||
self._transport_destination: Optional[str] = transport_destination
|
||||
|
||||
if text_filter:
|
||||
import warnings
|
||||
|
||||
@@ -207,13 +211,16 @@ class TTSService(AIService):
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame):
|
||||
silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=b"\x00" * silence_num_bytes,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
silence_frame = TTSAudioRawFrame(
|
||||
audio=b"\x00" * silence_num_bytes,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
silence_frame.transport_destination = self._transport_destination
|
||||
await self.push_frame(silence_frame)
|
||||
|
||||
if isinstance(frame, (TTSStartedFrame, TTSStoppedFrame, TTSAudioRawFrame, TTSTextFrame)):
|
||||
frame.transport_destination = self._transport_destination
|
||||
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
)
|
||||
self._params.audio_in_passthrough = True
|
||||
|
||||
if self._params.camera_in_enabled or self._params.camera_out_enabled:
|
||||
if self._params.camera_in_enabled:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
|
||||
@@ -8,11 +8,12 @@ import asyncio
|
||||
import itertools
|
||||
import sys
|
||||
import time
|
||||
from typing import AsyncGenerator, List
|
||||
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
|
||||
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
@@ -46,35 +47,28 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
# Task to process incoming frames so we don't block upstream elements.
|
||||
self._sink_task = None
|
||||
|
||||
# Task to process incoming frames using a clock.
|
||||
self._sink_clock_task = None
|
||||
|
||||
# Task to write/send audio and image frames.
|
||||
self._video_out_task = None
|
||||
|
||||
# These are the images that we should send at our desired framerate.
|
||||
self._video_images = None
|
||||
|
||||
# Output sample rate. It will be initialized on StartFrame.
|
||||
self._sample_rate = 0
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
# Chunk size that will be written. It will be computed on StartFrame
|
||||
# We write 10ms*CHUNKS of audio at a time (where CHUNKS is the
|
||||
# `audio_out_10ms_chunks` parameter). If we receive long audio frames we
|
||||
# will chunk them. This helps with interruption handling. It will be
|
||||
# initialized on StartFrame.
|
||||
self._audio_chunk_size = 0
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
# We will have one media sender per output frame destination. This allow
|
||||
# us to send multiple streams at the same time if the transport allows
|
||||
# it.
|
||||
self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {}
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def audio_chunk_size(self) -> int:
|
||||
return self._audio_chunk_size
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
@@ -84,42 +78,63 @@ class BaseOutputTransport(FrameProcessor):
|
||||
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
|
||||
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
|
||||
|
||||
# Start audio mixer.
|
||||
if self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.start(self._sample_rate)
|
||||
self._create_video_task()
|
||||
self._create_sink_tasks()
|
||||
# Register destinations.
|
||||
for destination in self._params.audio_out_destinations:
|
||||
await self.register_audio_destination(destination)
|
||||
|
||||
for destination in self._params.video_out_destinations:
|
||||
await self.register_video_destination(destination)
|
||||
|
||||
# Start default media sender.
|
||||
self._media_senders[None] = BaseOutputTransport.MediaSender(
|
||||
self,
|
||||
destination=None,
|
||||
sample_rate=self.sample_rate,
|
||||
audio_chunk_size=self.audio_chunk_size,
|
||||
params=self._params,
|
||||
)
|
||||
await self._media_senders[None].start(frame)
|
||||
|
||||
# Media senders already send both audio and video, so make sure we only
|
||||
# have one media server per shared name.
|
||||
destinations = list(
|
||||
set(self._params.audio_out_destinations + self._params.video_out_destinations)
|
||||
)
|
||||
|
||||
# Start media senders.
|
||||
for destination in destinations:
|
||||
self._media_senders[destination] = BaseOutputTransport.MediaSender(
|
||||
self,
|
||||
destination=destination,
|
||||
sample_rate=self.sample_rate,
|
||||
audio_chunk_size=self.audio_chunk_size,
|
||||
params=self._params,
|
||||
)
|
||||
await self._media_senders[destination].start(frame)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the sink tasks. We also need to wait
|
||||
# for these tasks before cancelling the video and audio tasks below
|
||||
# because they might be still rendering.
|
||||
if self._sink_task:
|
||||
await self.wait_for_task(self._sink_task)
|
||||
if self._sink_clock_task:
|
||||
await self.wait_for_task(self._sink_clock_task)
|
||||
|
||||
# We can now cancel the video task.
|
||||
await self._cancel_video_task()
|
||||
for _, sender in self._media_senders.items():
|
||||
await sender.stop(frame)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter if we cancel sink
|
||||
# tasks first or not.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_video_task()
|
||||
for _, sender in self._media_senders.items():
|
||||
await sender.cancel(frame)
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
pass
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def register_video_destination(self, destination: str):
|
||||
pass
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def register_audio_destination(self, destination: str):
|
||||
pass
|
||||
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
pass
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
pass
|
||||
|
||||
async def send_audio(self, frame: OutputAudioRawFrame):
|
||||
@@ -150,7 +165,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
|
||||
await self.push_frame(frame, direction)
|
||||
await self._handle_interruptions(frame)
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, TransportMessageUrgentFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
@@ -160,117 +175,428 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.stop(frame)
|
||||
# Keep pushing EndFrame down so all the pipeline stops nicely.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.process_frame(frame)
|
||||
elif isinstance(frame, MixerControlFrame):
|
||||
await self._handle_frame(frame)
|
||||
# Other frames.
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
await self._handle_audio(frame)
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
|
||||
await self._handle_image(frame)
|
||||
await self._handle_frame(frame)
|
||||
# TODO(aleix): Images and audio should support presentation timestamps.
|
||||
elif frame.pts:
|
||||
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
|
||||
await self._handle_frame(frame)
|
||||
elif direction == FrameDirection.UPSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
await self._handle_frame(frame)
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
if not self.interruptions_allowed:
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
if frame.transport_destination not in self._media_senders:
|
||||
logger.warning(
|
||||
f"{self} destination [{frame.transport_destination}] not registered for frame {frame}"
|
||||
)
|
||||
return
|
||||
|
||||
sender = self._media_senders[frame.transport_destination]
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Cancel sink and video tasks.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_video_task()
|
||||
# Create sink and video tasks.
|
||||
await sender.handle_interruptions(frame)
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
await sender.handle_audio_frame(frame)
|
||||
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
|
||||
await sender.handle_image_frame(frame)
|
||||
elif isinstance(frame, MixerControlFrame):
|
||||
await sender.handle_mixer_control_frame(frame)
|
||||
elif frame.pts:
|
||||
await sender.handle_timed_frame(frame)
|
||||
else:
|
||||
await sender.handle_sync_frame(frame)
|
||||
|
||||
#
|
||||
# Media Sender
|
||||
#
|
||||
|
||||
class MediaSender:
|
||||
def __init__(
|
||||
self,
|
||||
transport: "BaseOutputTransport",
|
||||
*,
|
||||
destination: Optional[str],
|
||||
sample_rate: int,
|
||||
audio_chunk_size: int,
|
||||
params: TransportParams,
|
||||
):
|
||||
self._transport = transport
|
||||
self._destination = destination
|
||||
self._sample_rate = sample_rate
|
||||
self._audio_chunk_size = audio_chunk_size
|
||||
self._params = params
|
||||
|
||||
# Buffer to keep track of incoming audio.
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
# This will be used to resample incoming audio to the output sample rate.
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
# The user can provide a single mixer, to be used by the default
|
||||
# destination, or a destination/mixer mapping.
|
||||
self._mixer: Optional[BaseAudioMixer] = None
|
||||
|
||||
# These are the images that we should send at our desired framerate.
|
||||
self._video_images = None
|
||||
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
|
||||
self._audio_task: Optional[asyncio.Task] = None
|
||||
self._video_task: Optional[asyncio.Task] = None
|
||||
self._clock_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def audio_chunk_size(self) -> int:
|
||||
return self._audio_chunk_size
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
# Create all tasks.
|
||||
self._create_video_task()
|
||||
self._create_sink_tasks()
|
||||
self._create_clock_task()
|
||||
self._create_audio_task()
|
||||
|
||||
# Check if we have an audio mixer for our destination.
|
||||
if self._params.audio_out_mixer:
|
||||
if isinstance(self._params.audio_out_mixer, Mapping):
|
||||
self._mixer = self._params.audio_out_mixer.get(self._destination, None)
|
||||
elif not self._destination:
|
||||
# Only use the default mixer if we are the default destination.
|
||||
self._mixer = self._params.audio_out_mixer
|
||||
|
||||
# Start audio mixer.
|
||||
if self._mixer:
|
||||
await self._mixer.start(self._sample_rate)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._audio_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the audio and clock tasks. We
|
||||
# also need to wait for these tasks before cancelling the video task
|
||||
# because it might be still rendering.
|
||||
if self._audio_task:
|
||||
await self._transport.wait_for_task(self._audio_task)
|
||||
if self._clock_task:
|
||||
await self._transport.wait_for_task(self._clock_task)
|
||||
|
||||
# Stop audio mixer.
|
||||
if self._mixer:
|
||||
await self._mixer.stop()
|
||||
|
||||
# We can now cancel the video task.
|
||||
await self._cancel_video_task()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter what task we cancel first.
|
||||
await self._cancel_audio_task()
|
||||
await self._cancel_clock_task()
|
||||
await self._cancel_video_task()
|
||||
|
||||
async def handle_interruptions(self, _: StartInterruptionFrame):
|
||||
if not self._transport.interruptions_allowed:
|
||||
return
|
||||
|
||||
# Cancel tasks.
|
||||
await self._cancel_audio_task()
|
||||
await self._cancel_clock_task()
|
||||
await self._cancel_video_task()
|
||||
# Create tasks.
|
||||
self._create_video_task()
|
||||
self._create_clock_task()
|
||||
self._create_audio_task()
|
||||
# Let's send a bot stopped speaking if we have to.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def _handle_audio(self, frame: OutputAudioRawFrame):
|
||||
if not self._params.audio_out_enabled:
|
||||
return
|
||||
async def handle_audio_frame(self, frame: OutputAudioRawFrame):
|
||||
if not self._params.audio_out_enabled:
|
||||
return
|
||||
|
||||
# We might need to resample if incoming audio doesn't match the
|
||||
# transport sample rate.
|
||||
resampled = await self._resampler.resample(
|
||||
frame.audio, frame.sample_rate, self._sample_rate
|
||||
)
|
||||
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(resampled)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
# We might need to resample if incoming audio doesn't match the
|
||||
# transport sample rate.
|
||||
resampled = await self._resampler.resample(
|
||||
frame.audio, frame.sample_rate, self._sample_rate
|
||||
)
|
||||
await self._sink_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
if not self._params.video_out_enabled:
|
||||
return
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(resampled)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
await self._audio_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_out_queue.put(frame)
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
if not self._params.video_out_enabled:
|
||||
return
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
if not self._bot_speaking:
|
||||
logger.debug("Bot started speaking")
|
||||
await self.push_frame(BotStartedSpeakingFrame())
|
||||
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = True
|
||||
if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame):
|
||||
await self._video_queue.put(frame)
|
||||
elif isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
else:
|
||||
await self._set_video_images(frame.images)
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
if self._bot_speaking:
|
||||
logger.debug("Bot stopped speaking")
|
||||
await self.push_frame(BotStoppedSpeakingFrame())
|
||||
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = False
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
async def handle_timed_frame(self, frame: Frame):
|
||||
await self._clock_queue.put((frame.pts, frame.id, frame))
|
||||
|
||||
#
|
||||
# Sink tasks
|
||||
#
|
||||
async def handle_sync_frame(self, frame: Frame):
|
||||
await self._audio_queue.put(frame)
|
||||
|
||||
def _create_sink_tasks(self):
|
||||
if not self._sink_task:
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = self.create_task(self._sink_task_handler())
|
||||
if not self._sink_clock_task:
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
|
||||
async def handle_mixer_control_frame(self, frame: MixerControlFrame):
|
||||
if self._mixer:
|
||||
await self._mixer.process_frame(frame)
|
||||
|
||||
async def _cancel_sink_tasks(self):
|
||||
# Stop sink tasks.
|
||||
if self._sink_task:
|
||||
await self.cancel_task(self._sink_task)
|
||||
self._sink_task = None
|
||||
# Stop sink clock tasks.
|
||||
if self._sink_clock_task:
|
||||
await self.cancel_task(self._sink_clock_task)
|
||||
self._sink_clock_task = None
|
||||
#
|
||||
# Audio handling
|
||||
#
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
def _create_audio_task(self):
|
||||
if not self._audio_task:
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._audio_task = self._transport.create_task(self._audio_task_handler())
|
||||
|
||||
async def _sink_clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
timestamp, _, frame = await self._sink_clock_queue.get()
|
||||
async def _cancel_audio_task(self):
|
||||
if self._audio_task:
|
||||
await self._transport.cancel_task(self._audio_task)
|
||||
self._audio_task = None
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
if not self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} started speaking"
|
||||
)
|
||||
|
||||
downstream_frame = BotStartedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStartedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = True
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
if self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
|
||||
)
|
||||
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = False
|
||||
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self._transport.send_message(frame)
|
||||
|
||||
def _next_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(
|
||||
self._audio_queue.get(), timeout=vad_stop_secs
|
||||
)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
# chunk duration in seconds
|
||||
chunk_duration = (10 * self._params.audio_out_10ms_chunks) / 1000.0
|
||||
next_frame_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
frame = self._audio_queue.get_nowait()
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
frame.audio = await self._mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._mixer.mix(silence),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
# Throttle frame generation to maintain real-time pacing and avoid flooding with silence
|
||||
wait = next_frame_time - time.time()
|
||||
if wait > 0:
|
||||
await asyncio.sleep(wait)
|
||||
next_frame_time += chunk_duration
|
||||
|
||||
if self._mixer:
|
||||
return with_mixer(BOT_VAD_STOP_SECS)
|
||||
else:
|
||||
return without_mixer(BOT_VAD_STOP_SECS)
|
||||
|
||||
async def _audio_task_handler(self):
|
||||
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
|
||||
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
|
||||
# every audio chunk.
|
||||
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
|
||||
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
|
||||
bot_speaking_counter = 0
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
|
||||
await self._transport.push_frame(BotSpeakingFrame())
|
||||
await self._transport.push_frame(
|
||||
BotSpeakingFrame(), FrameDirection.UPSTREAM
|
||||
)
|
||||
bot_speaking_counter = 0
|
||||
bot_speaking_counter += 1
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
await self._handle_frame(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self._transport.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self._transport.write_raw_audio_frames(frame.audio, self._destination)
|
||||
|
||||
#
|
||||
# Video handling
|
||||
#
|
||||
|
||||
def _create_video_task(self):
|
||||
if not self._video_task and self._params.video_out_enabled:
|
||||
self._video_queue = asyncio.Queue()
|
||||
self._video_task = self._transport.create_task(self._video_task_handler())
|
||||
|
||||
async def _cancel_video_task(self):
|
||||
# Stop video output task.
|
||||
if self._video_task:
|
||||
await self._transport.cancel_task(self._video_task)
|
||||
self._video_task = None
|
||||
|
||||
async def _set_video_image(self, image: OutputImageRawFrame):
|
||||
self._video_images = itertools.cycle([image])
|
||||
|
||||
async def _set_video_images(self, images: List[OutputImageRawFrame]):
|
||||
self._video_images = itertools.cycle(images)
|
||||
|
||||
async def _video_task_handler(self):
|
||||
self._video_start_time = None
|
||||
self._video_frame_index = 0
|
||||
self._video_frame_duration = 1 / self._params.video_out_framerate
|
||||
self._video_frame_reset = self._video_frame_duration * 5
|
||||
while True:
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_is_live_handler()
|
||||
elif self._video_images:
|
||||
image = next(self._video_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._video_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._video_frame_duration)
|
||||
|
||||
async def _video_is_live_handler(self):
|
||||
image = await self._video_queue.get()
|
||||
|
||||
# We get the start time as soon as we get the first image.
|
||||
if not self._video_start_time:
|
||||
self._video_start_time = time.time()
|
||||
self._video_frame_index = 0
|
||||
|
||||
# Calculate how much time we need to wait before rendering next image.
|
||||
real_elapsed_time = time.time() - self._video_start_time
|
||||
real_render_time = self._video_frame_index * self._video_frame_duration
|
||||
delay_time = self._video_frame_duration + real_render_time - real_elapsed_time
|
||||
|
||||
if abs(delay_time) > self._video_frame_reset:
|
||||
self._video_start_time = time.time()
|
||||
self._video_frame_index = 0
|
||||
elif delay_time > 0:
|
||||
await asyncio.sleep(delay_time)
|
||||
self._video_frame_index += 1
|
||||
|
||||
# Render image
|
||||
await self._draw_image(image)
|
||||
|
||||
self._video_queue.task_done()
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.video_out_width, self._params.video_out_height)
|
||||
|
||||
# TODO: we should refactor in the future to support dynamic resolutions
|
||||
# which is kind of what happens in P2P connections.
|
||||
# We need to add support for that inside the DailyTransport
|
||||
if frame.size != desired_size:
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
resized_image = image.resize(desired_size)
|
||||
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = OutputImageRawFrame(
|
||||
resized_image.tobytes(), resized_image.size, resized_image.format
|
||||
)
|
||||
|
||||
await self._transport.write_raw_video_frame(frame, self._destination)
|
||||
|
||||
#
|
||||
# Clock handling
|
||||
#
|
||||
|
||||
def _create_clock_task(self):
|
||||
if not self._clock_task:
|
||||
self._clock_queue = asyncio.PriorityQueue()
|
||||
self._clock_task = self._transport.create_task(self._clock_task_handler())
|
||||
|
||||
async def _cancel_clock_task(self):
|
||||
if self._clock_task:
|
||||
await self._transport.cancel_task(self._clock_task)
|
||||
self._clock_task = None
|
||||
|
||||
async def _clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
timestamp, _, frame = await self._clock_queue.get()
|
||||
|
||||
# If we hit an EndFrame, we can finish right away.
|
||||
running = not isinstance(frame, EndFrame)
|
||||
@@ -279,167 +605,12 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# has already passed we process it, otherwise we wait until it's
|
||||
# time to process it.
|
||||
if running:
|
||||
current_time = self.get_clock().get_time()
|
||||
current_time = self._transport.get_clock().get_time()
|
||||
if timestamp > current_time:
|
||||
wait_time = nanoseconds_to_seconds(timestamp - current_time)
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
# Push frame downstream.
|
||||
await self._transport.push_frame(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
self._sink_clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
def _next_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
while True:
|
||||
try:
|
||||
frame = self._sink_queue.get_nowait()
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._params.audio_out_mixer.mix(silence),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
if self._params.audio_out_mixer:
|
||||
return with_mixer(BOT_VAD_STOP_SECS)
|
||||
else:
|
||||
return without_mixer(BOT_VAD_STOP_SECS)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
|
||||
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
|
||||
# every audio chunk.
|
||||
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
|
||||
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
|
||||
bot_speaking_counter = 0
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
bot_speaking_counter = 0
|
||||
bot_speaking_counter += 1
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
|
||||
#
|
||||
# Video task
|
||||
#
|
||||
|
||||
def _create_video_task(self):
|
||||
# Create video output queue and task if needed.
|
||||
if not self._video_out_task and self._params.video_out_enabled:
|
||||
self._video_out_queue = asyncio.Queue()
|
||||
self._video_out_task = self.create_task(self._video_out_task_handler())
|
||||
|
||||
async def _cancel_video_task(self):
|
||||
# Stop video output task.
|
||||
if self._video_out_task and self._params.video_out_enabled:
|
||||
await self.cancel_task(self._video_out_task)
|
||||
self._video_out_task = None
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.video_out_width, self._params.video_out_height)
|
||||
|
||||
# TODO: we should refactor in the future to support dynamic resolutions
|
||||
# which is kind of what happens in P2P connections.
|
||||
# We need to add support for that inside the DailyTransport
|
||||
if frame.size != desired_size:
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
resized_image = image.resize(desired_size)
|
||||
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = OutputImageRawFrame(
|
||||
resized_image.tobytes(), resized_image.size, resized_image.format
|
||||
)
|
||||
|
||||
await self.write_raw_video_frame(frame)
|
||||
|
||||
async def _set_video_image(self, image: OutputImageRawFrame):
|
||||
self._video_images = itertools.cycle([image])
|
||||
|
||||
async def _set_video_images(self, images: List[OutputImageRawFrame]):
|
||||
self._video_images = itertools.cycle(images)
|
||||
|
||||
async def _video_out_task_handler(self):
|
||||
self._video_out_start_time = None
|
||||
self._video_out_frame_index = 0
|
||||
self._video_out_frame_duration = 1 / self._params.video_out_framerate
|
||||
self._video_out_frame_reset = self._video_out_frame_duration * 5
|
||||
while True:
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_out_is_live_handler()
|
||||
elif self._video_images:
|
||||
image = next(self._video_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._video_out_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._video_out_frame_duration)
|
||||
|
||||
async def _video_out_is_live_handler(self):
|
||||
image = await self._video_out_queue.get()
|
||||
|
||||
# We get the start time as soon as we get the first image.
|
||||
if not self._video_out_start_time:
|
||||
self._video_out_start_time = time.time()
|
||||
self._video_out_frame_index = 0
|
||||
|
||||
# Calculate how much time we need to wait before rendering next image.
|
||||
real_elapsed_time = time.time() - self._video_out_start_time
|
||||
real_render_time = self._video_out_frame_index * self._video_out_frame_duration
|
||||
delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
|
||||
|
||||
if abs(delay_time) > self._video_out_frame_reset:
|
||||
self._video_out_start_time = time.time()
|
||||
self._video_out_frame_index = 0
|
||||
elif delay_time > 0:
|
||||
await asyncio.sleep(delay_time)
|
||||
self._video_out_frame_index += 1
|
||||
|
||||
# Render image
|
||||
await self._draw_image(image)
|
||||
|
||||
self._video_out_queue.task_done()
|
||||
self._clock_queue.task_done()
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import Optional
|
||||
from typing import List, Mapping, Optional
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
@@ -33,7 +33,8 @@ class TransportParams(BaseModel):
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_out_10ms_chunks: int = 4
|
||||
audio_out_mixer: Optional[BaseAudioMixer] = None
|
||||
audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
|
||||
audio_out_destinations: List[str] = []
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: Optional[int] = None
|
||||
audio_in_channels: int = 1
|
||||
@@ -48,6 +49,7 @@ class TransportParams(BaseModel):
|
||||
video_out_bitrate: int = 800000
|
||||
video_out_framerate: int = 30
|
||||
video_out_color_format: str = "RGB"
|
||||
video_out_destinations: List[str] = []
|
||||
vad_enabled: bool = False
|
||||
vad_audio_passthrough: bool = False
|
||||
vad_analyzer: Optional[VADAnalyzer] = None
|
||||
|
||||
@@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
|
||||
self._out_stream.close()
|
||||
self._out_stream = None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
|
||||
@@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
self._out_stream.close()
|
||||
self._out_stream = None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
|
||||
def _write_frame_to_tk(self, frame: OutputImageRawFrame):
|
||||
|
||||
@@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
await super().start(frame)
|
||||
await self._client.setup(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._client.is_closing:
|
||||
return
|
||||
|
||||
|
||||
@@ -284,11 +284,13 @@ class SmallWebRTCClient:
|
||||
)
|
||||
yield audio_frame
|
||||
|
||||
async def write_raw_audio_frames(self, data: bytes):
|
||||
async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None):
|
||||
if self._can_send() and self._audio_output_track:
|
||||
await self._audio_output_track.add_audio_bytes(data)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
if self._can_send() and self._video_output_track:
|
||||
self._video_output_track.add_video_frame(frame)
|
||||
|
||||
@@ -482,9 +484,10 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
|
||||
self._params = params
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._client.setup(self._params, frame)
|
||||
await self._client.connect()
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -497,10 +500,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
await self._client.write_raw_video_frame(frame)
|
||||
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
await self._params.serializer.setup(frame)
|
||||
await self._session.setup(frame)
|
||||
await self._session.connect()
|
||||
@@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=frames,
|
||||
sample_rate=self.sample_rate,
|
||||
|
||||
@@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if not self._websocket:
|
||||
# Simulate audio playback with a sleep.
|
||||
await self._write_audio_sleep()
|
||||
|
||||
@@ -8,17 +8,13 @@ import asyncio
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Awaitable, Callable, Mapping, Optional
|
||||
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
|
||||
|
||||
import aiohttp
|
||||
from daily import (
|
||||
VirtualCameraDevice,
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
@@ -34,6 +30,7 @@ from pipecat.frames.frames import (
|
||||
TranscriptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
UserAudioRawFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
@@ -45,7 +42,17 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.utils.asyncio import BaseTaskManager
|
||||
|
||||
try:
|
||||
from daily import CallClient, Daily, EventHandler
|
||||
from daily import (
|
||||
AudioData,
|
||||
CallClient,
|
||||
CustomAudioSource,
|
||||
Daily,
|
||||
EventHandler,
|
||||
VideoFrame,
|
||||
VirtualCameraDevice,
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
)
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
@@ -149,6 +156,8 @@ class DailyParams(TransportParams):
|
||||
api_url: Daily API base URL
|
||||
api_key: Daily API authentication key
|
||||
dialin_settings: Optional settings for dial-in functionality
|
||||
camera_out_enabled: Whether to enable the main camera output track. If enabled, it still needs `video_out_enabled=True`
|
||||
microphone_out_enabled: Whether to enable the main microphone track. If enabled, it still needs `audio_out_enabled=True`
|
||||
transcription_enabled: Whether to enable speech transcription
|
||||
transcription_settings: Configuration for transcription service
|
||||
"""
|
||||
@@ -156,6 +165,8 @@ class DailyParams(TransportParams):
|
||||
api_url: str = "https://api.daily.co/v1"
|
||||
api_key: str = ""
|
||||
dialin_settings: Optional[DailyDialinSettings] = None
|
||||
camera_out_enabled: bool = True
|
||||
microphone_out_enabled: bool = True
|
||||
transcription_enabled: bool = False
|
||||
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
|
||||
|
||||
@@ -275,6 +286,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._transport_name = transport_name
|
||||
|
||||
self._participant_id: str = ""
|
||||
self._audio_renderers = {}
|
||||
self._video_renderers = {}
|
||||
self._transcription_ids = []
|
||||
self._transcription_status = None
|
||||
@@ -310,6 +322,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._camera: Optional[VirtualCameraDevice] = None
|
||||
self._mic: Optional[VirtualMicrophoneDevice] = None
|
||||
self._speaker: Optional[VirtualSpeakerDevice] = None
|
||||
self._audio_sources: Dict[str, CustomAudioSource] = {}
|
||||
|
||||
def _camera_name(self):
|
||||
return f"camera-{self}"
|
||||
@@ -328,6 +341,14 @@ class DailyTransportClient(EventHandler):
|
||||
def participant_id(self) -> str:
|
||||
return self._participant_id
|
||||
|
||||
@property
|
||||
def in_sample_rate(self) -> int:
|
||||
return self._in_sample_rate
|
||||
|
||||
@property
|
||||
def out_sample_rate(self) -> int:
|
||||
return self._out_sample_rate
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
if not self._joined:
|
||||
return
|
||||
@@ -365,19 +386,27 @@ class DailyTransportClient(EventHandler):
|
||||
await asyncio.sleep(0.01)
|
||||
return None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
if not self._mic:
|
||||
return None
|
||||
async def register_audio_destination(self, destination: str):
|
||||
self._audio_sources[destination] = await self.add_custom_audio_track(destination)
|
||||
self._client.update_publishing({"customAudio": {destination: True}})
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._mic.write_frames(frames, completion=completion_callback(future))
|
||||
if not destination and self._mic:
|
||||
self._mic.write_frames(frames, completion=completion_callback(future))
|
||||
elif destination and destination in self._audio_sources:
|
||||
source = self._audio_sources[destination]
|
||||
source.write_frames(frames, completion=completion_callback(future))
|
||||
else:
|
||||
logger.warning(f"{self} unable to write audio frames to destination [{destination}]")
|
||||
future.set_result(None)
|
||||
await future
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
if not self._camera:
|
||||
return None
|
||||
|
||||
self._camera.write_frame(frame.image)
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
if not destination and self._camera:
|
||||
self._camera.write_frame(frame.image)
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
@@ -480,6 +509,9 @@ class DailyTransportClient(EventHandler):
|
||||
async def _join(self):
|
||||
future = self._get_event_loop().create_future()
|
||||
|
||||
camera_enabled = self._params.video_out_enabled and self._params.camera_out_enabled
|
||||
microphone_enabled = self._params.audio_out_enabled and self._params.microphone_out_enabled
|
||||
|
||||
self._client.join(
|
||||
self._room_url,
|
||||
self._token,
|
||||
@@ -487,13 +519,13 @@ class DailyTransportClient(EventHandler):
|
||||
client_settings={
|
||||
"inputs": {
|
||||
"camera": {
|
||||
"isEnabled": self._params.video_out_enabled,
|
||||
"isEnabled": camera_enabled,
|
||||
"settings": {
|
||||
"deviceId": self._camera_name(),
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": self._params.audio_out_enabled,
|
||||
"isEnabled": microphone_enabled,
|
||||
"settings": {
|
||||
"deviceId": self._mic_name(),
|
||||
"customConstraints": {
|
||||
@@ -648,6 +680,28 @@ class DailyTransportClient(EventHandler):
|
||||
if self._joined and self._transcription_status:
|
||||
await self.update_transcription(self._transcription_ids)
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
callback: Callable,
|
||||
audio_source: str = "microphone",
|
||||
):
|
||||
# Only enable the desired audio source subscription on this participant.
|
||||
if audio_source in ("microphone", "screenAudio"):
|
||||
media = {"media": {audio_source: "subscribed"}}
|
||||
else:
|
||||
media = {"media": {"customAudio": {audio_source: "subscribed"}}}
|
||||
|
||||
await self.update_subscriptions(participant_settings={participant_id: media})
|
||||
|
||||
self._audio_renderers[participant_id] = {audio_source: callback}
|
||||
|
||||
self._client.set_audio_renderer(
|
||||
participant_id,
|
||||
self._audio_data_received,
|
||||
audio_source=audio_source,
|
||||
)
|
||||
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
@@ -656,12 +710,15 @@ class DailyTransportClient(EventHandler):
|
||||
video_source: str = "camera",
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
# Only enable the desired video source subscription on this participant.
|
||||
await self.update_subscriptions(
|
||||
participant_settings={participant_id: {"media": {video_source: "subscribed"}}}
|
||||
)
|
||||
# Only enable the desired audio source subscription on this participant.
|
||||
if video_source in ("camera", "screenVideo"):
|
||||
media = {"media": {video_source: "subscribed"}}
|
||||
else:
|
||||
media = {"media": {"customVideo": {video_source: "subscribed"}}}
|
||||
|
||||
self._video_renderers[participant_id] = callback
|
||||
await self.update_subscriptions(participant_settings={participant_id: media})
|
||||
|
||||
self._video_renderers[participant_id] = {video_source: callback}
|
||||
|
||||
self._client.set_video_renderer(
|
||||
participant_id,
|
||||
@@ -670,6 +727,20 @@ class DailyTransportClient(EventHandler):
|
||||
color_format=color_format,
|
||||
)
|
||||
|
||||
async def add_custom_audio_track(self, track_name: str) -> CustomAudioSource:
|
||||
future = self._get_event_loop().create_future()
|
||||
|
||||
audio_source = CustomAudioSource(self._out_sample_rate, 1)
|
||||
self._client.add_custom_audio_track(
|
||||
track_name=track_name,
|
||||
audio_source=audio_source,
|
||||
completion=completion_callback(future),
|
||||
)
|
||||
|
||||
await future
|
||||
|
||||
return audio_source
|
||||
|
||||
async def update_transcription(self, participants=None, instance_id=None):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_transcription(
|
||||
@@ -686,7 +757,15 @@ class DailyTransportClient(EventHandler):
|
||||
)
|
||||
await future
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
|
||||
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_publishing(
|
||||
publishing_settings=publishing_settings,
|
||||
completion=completion_callback(future),
|
||||
)
|
||||
await future
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_remote_participants(
|
||||
remote_participants=remote_participants, completion=completion_callback(future)
|
||||
@@ -773,15 +852,15 @@ class DailyTransportClient(EventHandler):
|
||||
# Daily (CallClient callbacks)
|
||||
#
|
||||
|
||||
def _video_frame_received(self, participant_id, video_frame):
|
||||
callback = self._video_renderers[participant_id]
|
||||
self._call_async_callback(
|
||||
callback,
|
||||
participant_id,
|
||||
video_frame.buffer,
|
||||
(video_frame.width, video_frame.height),
|
||||
video_frame.color_format,
|
||||
)
|
||||
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
|
||||
callback = self._audio_renderers[participant_id][audio_source]
|
||||
self._call_async_callback(callback, participant_id, audio_data, audio_source)
|
||||
|
||||
def _video_frame_received(
|
||||
self, participant_id: str, video_frame: VideoFrame, video_source: str
|
||||
):
|
||||
callback = self._video_renderers[participant_id][video_source]
|
||||
self._call_async_callback(callback, participant_id, video_frame, video_source)
|
||||
|
||||
def _call_async_callback(self, callback, *args):
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
@@ -837,6 +916,8 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# internally to be processed.
|
||||
self._audio_in_task = None
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
|
||||
|
||||
@property
|
||||
@@ -851,6 +932,9 @@ class DailyInputTransport(BaseInputTransport):
|
||||
self._audio_in_task = self.create_task(self._audio_in_task_handler())
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
|
||||
@@ -859,8 +943,6 @@ class DailyInputTransport(BaseInputTransport):
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
if self._params.audio_in_stream_on_start:
|
||||
@@ -916,6 +998,31 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Audio in
|
||||
#
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
audio_source: str = "camera",
|
||||
):
|
||||
await self._client.capture_participant_audio(
|
||||
participant_id, self._on_participant_audio_data, audio_source
|
||||
)
|
||||
|
||||
async def _on_participant_audio_data(
|
||||
self, participant_id: str, audio: AudioData, audio_source: str
|
||||
):
|
||||
resampled = await self._resampler.resample(
|
||||
audio.audio_frames, audio.sample_rate, self._client.out_sample_rate
|
||||
)
|
||||
|
||||
frame = UserAudioRawFrame(
|
||||
user_id=participant_id,
|
||||
audio=resampled,
|
||||
sample_rate=self._client.out_sample_rate,
|
||||
num_channels=audio.num_channels,
|
||||
)
|
||||
frame.transport_source = audio_source
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _audio_in_task_handler(self):
|
||||
while True:
|
||||
frame = await self._client.read_next_audio_frame()
|
||||
@@ -934,9 +1041,11 @@ class DailyInputTransport(BaseInputTransport):
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
self._video_renderers[participant_id] = {
|
||||
"framerate": framerate,
|
||||
"timestamp": 0,
|
||||
"render_next_frame": [],
|
||||
video_source: {
|
||||
"framerate": framerate,
|
||||
"timestamp": 0,
|
||||
"render_next_frame": [],
|
||||
}
|
||||
}
|
||||
|
||||
await self._client.capture_participant_video(
|
||||
@@ -947,12 +1056,14 @@ class DailyInputTransport(BaseInputTransport):
|
||||
if frame.user_id in self._video_renderers:
|
||||
self._video_renderers[frame.user_id]["render_next_frame"].append(frame)
|
||||
|
||||
async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
|
||||
async def _on_participant_video_frame(
|
||||
self, participant_id: str, video_frame: VideoFrame, video_source: str
|
||||
):
|
||||
render_frame = False
|
||||
|
||||
curr_time = time.time()
|
||||
prev_time = self._video_renderers[participant_id]["timestamp"]
|
||||
framerate = self._video_renderers[participant_id]["framerate"]
|
||||
prev_time = self._video_renderers[participant_id][video_source]["timestamp"]
|
||||
framerate = self._video_renderers[participant_id][video_source]["framerate"]
|
||||
|
||||
# Some times we render frames because of a request.
|
||||
request_frame = None
|
||||
@@ -961,20 +1072,23 @@ class DailyInputTransport(BaseInputTransport):
|
||||
next_time = prev_time + 1 / framerate
|
||||
render_frame = (next_time - curr_time) < 0.1
|
||||
|
||||
elif self._video_renderers[participant_id]["render_next_frame"]:
|
||||
request_frame = self._video_renderers[participant_id]["render_next_frame"].pop(0)
|
||||
elif self._video_renderers[participant_id][video_source]["render_next_frame"]:
|
||||
request_frame = self._video_renderers[participant_id][video_source][
|
||||
"render_next_frame"
|
||||
].pop(0)
|
||||
render_frame = True
|
||||
|
||||
if render_frame:
|
||||
frame = UserImageRawFrame(
|
||||
user_id=participant_id,
|
||||
request=request_frame,
|
||||
image=buffer,
|
||||
size=size,
|
||||
format=format,
|
||||
image=video_frame.buffer,
|
||||
size=(video_frame.width, video_frame.height),
|
||||
format=video_frame.color_format,
|
||||
)
|
||||
frame.transport_source = video_source
|
||||
await self.push_frame(frame)
|
||||
self._video_renderers[participant_id]["timestamp"] = curr_time
|
||||
self._video_renderers[participant_id][video_source]["timestamp"] = curr_time
|
||||
|
||||
|
||||
class DailyOutputTransport(BaseOutputTransport):
|
||||
@@ -999,6 +1113,9 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
self._initialized = False
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
|
||||
@@ -1007,8 +1124,6 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
|
||||
@@ -1032,11 +1147,19 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
async def register_video_destination(self, destination: str):
|
||||
logger.warning(f"{self} registering video destinations is not supported yet")
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
await self._client.write_raw_video_frame(frame)
|
||||
async def register_audio_destination(self, destination: str):
|
||||
await self._client.register_audio_destination(destination)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
await self._client.write_raw_audio_frames(frames, destination)
|
||||
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
await self._client.write_raw_video_frame(frame, destination)
|
||||
|
||||
|
||||
class DailyTransport(BaseTransport):
|
||||
@@ -1204,6 +1327,14 @@ class DailyTransport(BaseTransport):
|
||||
async def capture_participant_transcription(self, participant_id: str):
|
||||
await self._client.capture_participant_transcription(participant_id)
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
audio_source: str = "microphone",
|
||||
):
|
||||
if self._input:
|
||||
await self._input.capture_participant_audio(participant_id, audio_source)
|
||||
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
@@ -1216,12 +1347,15 @@ class DailyTransport(BaseTransport):
|
||||
participant_id, framerate, video_source, color_format
|
||||
)
|
||||
|
||||
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
|
||||
await self._client.update_publishing(publishing_settings=publishing_settings)
|
||||
|
||||
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
|
||||
await self._client.update_subscriptions(
|
||||
participant_settings=participant_settings, profile_settings=profile_settings
|
||||
)
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
|
||||
await self._client.update_remote_participants(remote_participants=remote_participants)
|
||||
|
||||
async def _on_joined(self, data):
|
||||
|
||||
@@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
|
||||
else:
|
||||
await self._client.send_data(frame.message.encode())
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
|
||||
await self._client.publish_audio(livekit_audio)
|
||||
|
||||
|
||||