Compare commits
27 Commits
hush/rtviS
...
mb/stt-dir
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ac6ac5efb | ||
|
|
8250736f5e | ||
|
|
83348a9f93 | ||
|
|
96d40903a9 | ||
|
|
2560811805 | ||
|
|
2b8c44c008 | ||
|
|
38e2d37674 | ||
|
|
6278561f88 | ||
|
|
750e79c1ce | ||
|
|
71eb2963c5 | ||
|
|
f44e2c86ea | ||
|
|
afe1f0df8c | ||
|
|
458fddfb48 | ||
|
|
8d915c5ccb | ||
|
|
304153dd03 | ||
|
|
a6781b7352 | ||
|
|
5ad0058303 | ||
|
|
75c039de33 | ||
|
|
74e3c3677e | ||
|
|
dc20327f10 | ||
|
|
e738affd29 | ||
|
|
ef3d732607 | ||
|
|
6d63cff1bf | ||
|
|
12f42605a1 | ||
|
|
76d198151c | ||
|
|
6a907058de | ||
|
|
6e1f531f64 |
57
CHANGELOG.md
57
CHANGELOG.md
@@ -9,6 +9,43 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added two new frames `RequestSTTMuteFrame`, `RequestSTTUnmuteFrame`. These
|
||||
frames tell the `STTMuteFilter` to directly mute or unmute the user and
|
||||
take precedent over the mute strategies when an `RequestSTTMuteFrame` is
|
||||
processed.
|
||||
|
||||
- `BaseOutputTransport` now allows multiple destinations if the transport
|
||||
implementation supports it (e.g. Daily's custom tracks). With multiple
|
||||
destinations it is possible to send different audio or video tracks with a
|
||||
single transport simultaneously. To do that, you need to set the new
|
||||
`Frame.transport_destination` field with your desired transport destination
|
||||
(e.g. custom track name), tell the transport you want a new destination with
|
||||
`TransportParams.audio_out_destinations` or
|
||||
`TransportParams.video_out_destinations` and the transport should take care of
|
||||
the rest.
|
||||
|
||||
- Similar to the new `Frame.transport_destination`, there's a new
|
||||
`Frame.transport_source` field which is set by the `BaseInputTransport` if the
|
||||
incoming data comes from a non-default source (e.g. custom tracks).
|
||||
|
||||
- `TTSService` has a new `transport_destination` constructor parameter. This
|
||||
parameter will be used to update the `Frame.transport_destination` field for
|
||||
each generated `TTSAudioRawFrame`. This allows sending multiple bots' audio to
|
||||
multiple destinations in the same pipeline.
|
||||
|
||||
- Added `DailyTransportParams.camera_out_enabled` and
|
||||
`DailyTransportParams.microphone_out_enabled` which allows you to
|
||||
enable/disable the main output camera or microphone tracks. This is useful if
|
||||
you only want to use custom tracks and not send the main tracks. Note that you
|
||||
still need `audio_out_enabled=True` or `video_out_enabled`.
|
||||
|
||||
- Added `DailyTransport.capture_participant_audio()` which allows you to capture
|
||||
an audio source (e.g. "microphone", "screenAudio" or a custom track name) from
|
||||
a remote participant.
|
||||
|
||||
- Added `DailyTransport.update_publishing()` which allows you to update the call
|
||||
video and audio publishing settings (e.g. audio and video quality).
|
||||
|
||||
- Added `RTVIObserverParams` which allows you to configure what RTVI messages
|
||||
are sent to the clients.
|
||||
|
||||
@@ -37,6 +74,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Changed
|
||||
|
||||
- `TransportParams.audio_mixer` now supports a string and also a dictionary to
|
||||
provide a mixer per destination. For example:
|
||||
|
||||
```python
|
||||
audio_out_mixer={
|
||||
"track-1": SoundfileMixer(...),
|
||||
"track-2": SoundfileMixer(...),
|
||||
"track-N": SoundfileMixer(...),
|
||||
},
|
||||
```
|
||||
|
||||
- The `STTMuteFilter` now mutes `InterimTranscriptionFrame` and
|
||||
`TranscriptionFrame` which allows the `STTMuteFilter` to be used in
|
||||
conjunction with transports that generate transcripts, e.g. `DailyTransport`.
|
||||
@@ -87,6 +135,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where `OpenAIRealtimeBetaLLMService` would add two assistant
|
||||
messages to the context.
|
||||
|
||||
- Fixed an issue with `GeminiMultimodalLiveLLMService` where the context
|
||||
contained tokens instead of words.
|
||||
|
||||
@@ -102,6 +153,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Other
|
||||
|
||||
- Added `examples/daily-custom-tracks` to show how to send and receive Daily
|
||||
custom tracks.
|
||||
|
||||
- Added `examples/daily-multi-translation` to showcase how to send multiple
|
||||
simulataneous translations with the same transport.
|
||||
|
||||
- Added 04 foundational examples for client/server transports. Also, renamed
|
||||
`29-livekit-audio-chat.py` to `04b-transports-livekit.py`.
|
||||
|
||||
|
||||
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
|
||||
39
examples/daily-custom-tracks/README.md
Normal file
39
examples/daily-custom-tracks/README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
# Daily Custom Tracks
|
||||
|
||||
This example shows how to send and receive Daily custom tracks. We will run a simple `daily-python` application to send an audio file with a custom track (named "pipecat") to a room. Then, the Pipecat bot will mirror that custom track into another custom track (named "pipecat-mirror") in the same room.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Run the bot
|
||||
|
||||
Start the bot by giving it a Daily room URL.
|
||||
|
||||
```bash
|
||||
python bot.py -u ROOM_URL
|
||||
```
|
||||
|
||||
The bot will wait for the first participant to join. Then, it will mirror a custom track named "pipecat" into a new custom track named "pipecat-mirror".
|
||||
|
||||
## Run the sender
|
||||
|
||||
Now, run the custom track sender. This is a simple `daily-python` application that opens and audio file and sends it as a custom track to the same Daily room.
|
||||
|
||||
```bash
|
||||
python custom_track_sender.py -u ROOM_URL -i office-ambience-mono-16000.mp3
|
||||
```
|
||||
|
||||
## Open client
|
||||
|
||||
Finally, open the client so you can hear both custom tracks.
|
||||
|
||||
```bash
|
||||
open index.html
|
||||
```
|
||||
|
||||
Once the client is opened, copy the URL of the Daily room and join it. You should be able to select which custom track you want to hear.
|
||||
87
examples/daily-custom-tracks/bot.py
Normal file
87
examples/daily-custom-tracks/bot.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class CustomTrackMirrorProcessor(FrameProcessor):
|
||||
def __init__(self, transport_destination: str, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._transport_destination = transport_destination
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, InputAudioRawFrame) and frame.transport_source:
|
||||
output_frame = OutputAudioRawFrame(
|
||||
audio=frame.audio,
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
output_frame.transport_destination = self._transport_destination
|
||||
await self.push_frame(output_frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"Custom tracks mirror",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
microphone_out_enabled=False, # Disable since we just use custom tracks
|
||||
audio_out_destinations=["pipecat-mirror"],
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
CustomTrackMirrorProcessor("pipecat-mirror"),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_audio(participant["id"], audio_source="pipecat")
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
74
examples/daily-custom-tracks/custom_track_sender.py
Normal file
74
examples/daily-custom-tracks/custom_track_sender.py
Normal file
@@ -0,0 +1,74 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from daily import CallClient, CustomAudioSource, Daily
|
||||
from pydub import AudioSegment
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument("-u", "--url", type=str, required=True, help="URL of the Daily room to join")
|
||||
parser.add_argument(
|
||||
"-i", "--input", type=str, required=True, help="Input audio file (needs 16000 sample rate)"
|
||||
)
|
||||
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
audio = AudioSegment.from_mp3(args.input)
|
||||
|
||||
raw_bytes = audio.raw_data
|
||||
sample_rate = audio.frame_rate
|
||||
channels = audio.channels
|
||||
|
||||
print(f"Length: {len(raw_bytes)} bytes")
|
||||
print(f"Sample rate: {sample_rate}, Channels: {channels}")
|
||||
|
||||
# Initialize the Daily context & create call client
|
||||
Daily.init()
|
||||
|
||||
client = CallClient()
|
||||
|
||||
# Join the room and indicate we have a custom track named "pipecat".
|
||||
client.join(
|
||||
args.url,
|
||||
client_settings={
|
||||
"publishing": {
|
||||
"camera": False,
|
||||
"microphone": False,
|
||||
"customAudio": {"pipecat": True},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
# Just sleep for a couple of seconds. To do this well we should really use
|
||||
# completions.
|
||||
time.sleep(2)
|
||||
|
||||
# Create the custom audio source. This is where we will write our audio.
|
||||
audio_source = CustomAudioSource(sample_rate, channels)
|
||||
|
||||
# Create an audio track and assign it our audio source.
|
||||
client.add_custom_audio_track("pipecat", audio_source)
|
||||
|
||||
# Just sleep for a second. To do this well we should really use completions.
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
# Just write one second of audio until we have read all the file.
|
||||
chunk_size = sample_rate * channels * 2
|
||||
while len(raw_bytes) > 0:
|
||||
chunk = raw_bytes[:chunk_size]
|
||||
raw_bytes = raw_bytes[chunk_size:]
|
||||
audio_source.write_frames(chunk)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
client.leave()
|
||||
|
||||
# Just sleep for a second. To do this well we should really use completions.
|
||||
time.sleep(1)
|
||||
|
||||
client.release()
|
||||
173
examples/daily-custom-tracks/index.html
Normal file
173
examples/daily-custom-tracks/index.html
Normal file
@@ -0,0 +1,173 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>daily custom tracks</title>
|
||||
</head>
|
||||
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
type="text/css"
|
||||
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
|
||||
/>
|
||||
<script>
|
||||
function enableButton(buttonId, enable) {
|
||||
const button = document.getElementById(buttonId);
|
||||
button.disabled = !enable;
|
||||
}
|
||||
|
||||
function enableJoinButton(enable) {
|
||||
enableButton("join-button", enable);
|
||||
}
|
||||
|
||||
function enableLeaveButton(enable) {
|
||||
enableButton("leave-button", enable);
|
||||
}
|
||||
|
||||
function destroyPlayers(query) {
|
||||
const items = document.querySelectorAll(query);
|
||||
if (items) {
|
||||
for (const item of items) {
|
||||
item.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function destroyParticipantPlayers(participantId) {
|
||||
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`button[data-participant-id="${participantId}"]`);
|
||||
}
|
||||
|
||||
async function startPlayer(player, track) {
|
||||
player.muted = false;
|
||||
player.autoplay = true;
|
||||
if (track != null) {
|
||||
player.srcObject = new MediaStream([track]);
|
||||
}
|
||||
}
|
||||
|
||||
async function buildAudioPlayer(track, participantId) {
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
const player = document.createElement("audio");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
// Create a new button for controlling audio
|
||||
const audioControlButton = document.createElement("button");
|
||||
audioControlButton.className = "ui primary green button"
|
||||
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
|
||||
audioControlButton.dataset.participantId = participantId;
|
||||
audioControlButton.onclick = () => {
|
||||
if (player.paused) {
|
||||
|
||||
player.play();
|
||||
audioControlButton.className = "ui primary red button"
|
||||
} else {
|
||||
player.pause();
|
||||
audioControlButton.className = "ui primary green button"
|
||||
}
|
||||
};
|
||||
|
||||
audioContainer.appendChild(player);
|
||||
audioContainer.appendChild(audioControlButton);
|
||||
|
||||
await startPlayer(player, track);
|
||||
player.pause()
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
function subscribeToTracks(participantId) {
|
||||
console.log(`subscribing to track`);
|
||||
|
||||
if (participantId === "local") {
|
||||
return;
|
||||
}
|
||||
|
||||
callObject.updateParticipant(participantId, {
|
||||
setSubscribedTracks: {
|
||||
audio: true,
|
||||
video: false,
|
||||
custom: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function startDaily() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
window.callObject = window.DailyIframe.createCallObject({});
|
||||
|
||||
callObject.on("participant-joined", (e) => {
|
||||
if (!e.participant.local) {
|
||||
console.log("participant-joined", e.participant);
|
||||
subscribeToTracks(e.participant.session_id);
|
||||
}
|
||||
});
|
||||
|
||||
callObject.on("participant-left", (e) => {
|
||||
console.log("participant-left", e.participant.session_id);
|
||||
destroyParticipantPlayers(e.participant.session_id);
|
||||
});
|
||||
|
||||
callObject.on("track-started", async (e) => {
|
||||
console.log("track-started", e.track);
|
||||
if (e.track.kind === "audio") {
|
||||
await buildAudioPlayer(e.track, e.participant.session_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function joinRoom() {
|
||||
enableJoinButton(false);
|
||||
enableLeaveButton(true);
|
||||
|
||||
const meetingUrl = document.getElementById("meeting-url").value;
|
||||
|
||||
callObject.join({
|
||||
url: meetingUrl,
|
||||
startVideoOff: true,
|
||||
startAudioOff: true,
|
||||
subscribeToTracksAutomatically: false,
|
||||
receiveSettings: {
|
||||
base: { video: { layer: 0 } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function leaveRoom() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
callObject.leave();
|
||||
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
audioContainer.replaceChildren();
|
||||
}
|
||||
</script>
|
||||
|
||||
<body onload="startDaily()">
|
||||
<div class="ui centered page grid" style="margin-top: 30px">
|
||||
<div class="ten wide column">
|
||||
<div class="ui form" style="margin-top: 30px">
|
||||
<div class="field">
|
||||
<label>Meeting URL</label>
|
||||
<input id="meeting-url" value="" />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="ui centered aligned header" style="margin-top: 30px">
|
||||
<button id="join-button" class="ui primary button" onclick="joinRoom()">
|
||||
Join
|
||||
</button>
|
||||
<button id="leave-button" class="ui button" onclick="leaveRoom()">
|
||||
Leave
|
||||
</button>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="audio-container"></div><br/>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
BIN
examples/daily-custom-tracks/office-ambience-mono-16000.mp3
Normal file
BIN
examples/daily-custom-tracks/office-ambience-mono-16000.mp3
Normal file
Binary file not shown.
2
examples/daily-custom-tracks/requirements.txt
Normal file
2
examples/daily-custom-tracks/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
pydub
|
||||
pipecat-ai[daily]
|
||||
55
examples/daily-custom-tracks/runner.py
Normal file
55
examples/daily-custom-tracks/runner.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
15
examples/daily-multi-translation/Dockerfile
Normal file
15
examples/daily-multi-translation/Dockerfile
Normal file
@@ -0,0 +1,15 @@
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
RUN mkdir /app
|
||||
RUN mkdir /app/assets
|
||||
RUN mkdir /app/utils
|
||||
COPY *.py /app/
|
||||
COPY requirements.txt /app/
|
||||
|
||||
|
||||
WORKDIR /app
|
||||
RUN pip3 install -r requirements.txt
|
||||
|
||||
EXPOSE 7860
|
||||
|
||||
CMD ["python3", "server.py"]
|
||||
39
examples/daily-multi-translation/README.md
Normal file
39
examples/daily-multi-translation/README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
# Daily Multi Translation
|
||||
|
||||
This example shows how to use Daily to stream multiple simultaneous translations using a single transport. Daily provides custom tracks and in this example we will simultaneously translate incoming audio in English to Spanish, French and German, each of them being sent to a custom track.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
|
||||
cp env.example .env # and add your credentials
|
||||
|
||||
```
|
||||
|
||||
## Run the server
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/` in your browser. This will open a Daily Prebuilt room where you will speak in English (make sure you are not muted).
|
||||
|
||||
## Open client
|
||||
|
||||
Next, you need to open the client that will listen to the translations.
|
||||
|
||||
```bash
|
||||
open index.html
|
||||
```
|
||||
|
||||
Once the client is opened, copy the URL of the Daily room created above and join it. You should be able to select which translation you want to hear.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
```
|
||||
docker build -t daily-multi-translation .
|
||||
docker run --env-file .env -p 7860:7860 daily-multi-translation
|
||||
```
|
||||
165
examples/daily-multi-translation/bot.py
Normal file
165
examples/daily-multi-translation/bot.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
BACKGROUND_SOUND_FILE = "office-ambience-mono-16000.mp3"
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Multi translation bot",
|
||||
DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
audio_out_mixer={
|
||||
"spanish": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
"french": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
"german": SoundfileMixer(
|
||||
sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
|
||||
),
|
||||
},
|
||||
audio_out_destinations=["spanish", "french", "german"],
|
||||
microphone_out_enabled=False, # Disable since we just use custom tracks
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts_spanish = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="cefcb124-080b-4655-b31f-932f3ee743de",
|
||||
transport_destination="spanish",
|
||||
)
|
||||
tts_french = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="8832a0b5-47b2-4751-bb22-6a8e2149303d",
|
||||
transport_destination="french",
|
||||
)
|
||||
tts_german = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="38aabb6a-f52b-4fb0-a3d1-988518f4dc06",
|
||||
transport_destination="german",
|
||||
)
|
||||
|
||||
messages_spanish = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into Spanish.",
|
||||
},
|
||||
]
|
||||
messages_french = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into French.",
|
||||
},
|
||||
]
|
||||
messages_german = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You will be provided with a sentence in English, and your task is to only translate it into German.",
|
||||
},
|
||||
]
|
||||
|
||||
llm_spanish = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm_french = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
llm_german = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
context_spanish = OpenAILLMContext(messages_spanish)
|
||||
context_aggregator_spanish = llm_spanish.create_context_aggregator(context_spanish)
|
||||
|
||||
context_french = OpenAILLMContext(messages_french)
|
||||
context_aggregator_french = llm_french.create_context_aggregator(context_french)
|
||||
|
||||
context_german = OpenAILLMContext(messages_german)
|
||||
context_aggregator_german = llm_german.create_context_aggregator(context_german)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
ParallelPipeline(
|
||||
# Spanish pipeline.
|
||||
[
|
||||
context_aggregator_spanish.user(),
|
||||
llm_spanish,
|
||||
tts_spanish,
|
||||
context_aggregator_spanish.assistant(),
|
||||
],
|
||||
# French pipeline.
|
||||
[
|
||||
context_aggregator_french.user(),
|
||||
llm_french,
|
||||
tts_french,
|
||||
context_aggregator_french.assistant(),
|
||||
],
|
||||
# German pipeline.
|
||||
[
|
||||
context_aggregator_german.user(),
|
||||
llm_german,
|
||||
tts_german,
|
||||
context_aggregator_german.assistant(),
|
||||
],
|
||||
),
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
audio_in_sample_rate=16000,
|
||||
audio_out_sample_rate=16000,
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
observers=[TranscriptionLogObserver()],
|
||||
)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
5
examples/daily-multi-translation/env.example
Normal file
5
examples/daily-multi-translation/env.example
Normal file
@@ -0,0 +1,5 @@
|
||||
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
|
||||
DAILY_API_KEY=7df...
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
DEEPGRAM_API_KEY=efb...
|
||||
CARTESIA_API_KEY=aeb...
|
||||
202
examples/daily-multi-translation/index.html
Normal file
202
examples/daily-multi-translation/index.html
Normal file
@@ -0,0 +1,202 @@
|
||||
<html>
|
||||
<head>
|
||||
<title>daily multi translation</title>
|
||||
</head>
|
||||
<script crossorigin src="https://unpkg.com/@daily-co/daily-js"></script>
|
||||
<script
|
||||
src="https://code.jquery.com/jquery-3.1.1.min.js"
|
||||
integrity="sha256-hVVnYaiADRTO2PzUGmuLJr8BLUSjGIZsDYGmIJLv2b8="
|
||||
crossorigin="anonymous"
|
||||
></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.js"></script>
|
||||
<link
|
||||
rel="stylesheet"
|
||||
type="text/css"
|
||||
href="https://cdnjs.cloudflare.com/ajax/libs/fomantic-ui/2.8.6/semantic.min.css"
|
||||
/>
|
||||
<script>
|
||||
function enableButton(buttonId, enable) {
|
||||
const button = document.getElementById(buttonId);
|
||||
button.disabled = !enable;
|
||||
}
|
||||
|
||||
function enableJoinButton(enable) {
|
||||
enableButton("join-button", enable);
|
||||
}
|
||||
|
||||
function enableLeaveButton(enable) {
|
||||
enableButton("leave-button", enable);
|
||||
}
|
||||
|
||||
function destroyPlayers(query) {
|
||||
const items = document.querySelectorAll(query);
|
||||
if (items) {
|
||||
for (const item of items) {
|
||||
item.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function destroyParticipantPlayers(participantId) {
|
||||
destroyPlayers(`video[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`audio[data-participant-id="${participantId}"]`);
|
||||
destroyPlayers(`button[data-participant-id="${participantId}"]`);
|
||||
}
|
||||
|
||||
async function startPlayer(player, track) {
|
||||
player.muted = false;
|
||||
player.autoplay = true;
|
||||
if (track != null) {
|
||||
player.srcObject = new MediaStream([track]);
|
||||
}
|
||||
}
|
||||
|
||||
async function buildVideoPlayer(track, participantId) {
|
||||
const videoContainer = document.getElementById("video-container");
|
||||
const player = document.createElement("video");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
videoContainer.appendChild(player);
|
||||
|
||||
await startPlayer(player, track);
|
||||
await player.play();
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
async function buildAudioPlayer(track, participantId) {
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
const player = document.createElement("audio");
|
||||
player.dataset.participantId = participantId;
|
||||
|
||||
// Create a new button for controlling audio
|
||||
const audioControlButton = document.createElement("button");
|
||||
audioControlButton.className = "ui primary green button"
|
||||
audioControlButton.innerText = track._mediaTag == "cam-audio" ? "english" : track._mediaTag;
|
||||
audioControlButton.dataset.participantId = participantId;
|
||||
audioControlButton.onclick = () => {
|
||||
if (player.paused) {
|
||||
|
||||
player.play();
|
||||
audioControlButton.className = "ui primary red button"
|
||||
} else {
|
||||
player.pause();
|
||||
audioControlButton.className = "ui primary green button"
|
||||
}
|
||||
};
|
||||
|
||||
audioContainer.appendChild(player);
|
||||
audioContainer.appendChild(audioControlButton);
|
||||
|
||||
await startPlayer(player, track);
|
||||
player.pause()
|
||||
|
||||
return player;
|
||||
}
|
||||
|
||||
function subscribeToTracks(participantId) {
|
||||
console.log(`subscribing to track`);
|
||||
|
||||
if (participantId === "local") {
|
||||
return;
|
||||
}
|
||||
|
||||
callObject.updateParticipant(participantId, {
|
||||
setSubscribedTracks: {
|
||||
audio: true,
|
||||
video: true,
|
||||
custom: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function startDaily() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
window.callObject = window.DailyIframe.createCallObject({});
|
||||
|
||||
callObject.on("participant-joined", (e) => {
|
||||
if (!e.participant.local) {
|
||||
console.log("participant-joined", e.participant);
|
||||
subscribeToTracks(e.participant.session_id);
|
||||
}
|
||||
});
|
||||
|
||||
callObject.on("participant-left", (e) => {
|
||||
console.log("participant-left", e.participant.session_id);
|
||||
destroyParticipantPlayers(e.participant.session_id);
|
||||
});
|
||||
|
||||
callObject.on("track-started", async (e) => {
|
||||
console.log("track-started", e.track);
|
||||
if (e.track.kind === "video") {
|
||||
await buildVideoPlayer(e.track, e.participant.session_id);
|
||||
} else if (e.track.kind === "audio") {
|
||||
await buildAudioPlayer(e.track, e.participant.session_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function joinRoom() {
|
||||
enableJoinButton(false);
|
||||
enableLeaveButton(true);
|
||||
|
||||
const meetingUrl = document.getElementById("meeting-url").value;
|
||||
|
||||
callObject.join({
|
||||
url: meetingUrl,
|
||||
startVideoOff: true,
|
||||
startAudioOff: true,
|
||||
subscribeToTracksAutomatically: false,
|
||||
receiveSettings: {
|
||||
base: { video: { layer: 0 } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function leaveRoom() {
|
||||
enableJoinButton(true);
|
||||
enableLeaveButton(false);
|
||||
|
||||
callObject.leave();
|
||||
|
||||
const videoContainer = document.getElementById("video-container");
|
||||
videoContainer.replaceChildren();
|
||||
|
||||
const audioContainer = document.getElementById("audio-container");
|
||||
audioContainer.replaceChildren();
|
||||
}
|
||||
</script>
|
||||
|
||||
<body onload="startDaily()">
|
||||
<div class="ui centered page grid" style="margin-top: 30px">
|
||||
<div class="ten wide column">
|
||||
<div class="ui form" style="margin-top: 30px">
|
||||
<div class="field">
|
||||
<label>Meeting URL</label>
|
||||
<input id="meeting-url" value="" />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="ui centered aligned header" style="margin-top: 30px">
|
||||
<button id="join-button" class="ui primary button" onclick="joinRoom()">
|
||||
Join
|
||||
</button>
|
||||
<button id="leave-button" class="ui button" onclick="leaveRoom()">
|
||||
Leave
|
||||
</button>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="audio-container"></div><br/>
|
||||
</div>
|
||||
</div>
|
||||
<div id="tile" class="ui container" style="margin-top: 30px">
|
||||
<div id="tile" class="ui center aligned grid">
|
||||
<div id="video-container" class="ui segment"></div>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
BIN
examples/daily-multi-translation/office-ambience-mono-16000.mp3
Normal file
BIN
examples/daily-multi-translation/office-ambience-mono-16000.mp3
Normal file
Binary file not shown.
5
examples/daily-multi-translation/requirements.txt
Normal file
5
examples/daily-multi-translation/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
aiofiles
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
pipecat-ai[daily,deepgram,openai,silero,cartesia]
|
||||
55
examples/daily-multi-translation/runner.py
Normal file
55
examples/daily-multi-translation/runner.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
139
examples/daily-multi-translation/server.py
Normal file
139
examples/daily-multi-translation/server.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, RedirectResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
bot_procs = {}
|
||||
|
||||
daily_helpers = {}
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def cleanup():
|
||||
# Clean up function, just to be extra safe
|
||||
for entry in bot_procs.values():
|
||||
proc = entry[0]
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
aiohttp_session = aiohttp.ClientSession()
|
||||
daily_helpers["rest"] = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
yield
|
||||
await aiohttp_session.close()
|
||||
cleanup()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
print(f"!!! Room URL: {room.url}")
|
||||
# Ensure the room property is present
|
||||
if not room.url:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Missing 'room' property in request data. Cannot start agent without a target room!",
|
||||
)
|
||||
|
||||
# Check if there is already an existing process running in this room
|
||||
num_bots_in_room = sum(
|
||||
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
|
||||
)
|
||||
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
|
||||
raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
|
||||
|
||||
# Get the token for the room
|
||||
token = await daily_helpers["rest"].get_token(room.url)
|
||||
|
||||
if not token:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)),
|
||||
)
|
||||
bot_procs[proc.pid] = (proc, room.url)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
return RedirectResponse(room.url)
|
||||
|
||||
|
||||
@app.get("/status/{pid}")
|
||||
def get_status(pid: int):
|
||||
# Look up the subprocess
|
||||
proc = bot_procs.get(pid)
|
||||
|
||||
# If the subprocess doesn't exist, return an error
|
||||
if not proc:
|
||||
raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
|
||||
|
||||
# Check the status of the subprocess
|
||||
if proc[0].poll() is None:
|
||||
status = "running"
|
||||
else:
|
||||
status = "finished"
|
||||
|
||||
return JSONResponse({"bot_id": pid, "status": status})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
|
||||
parser.add_argument("--host", type=str, default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int, default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true", help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload,
|
||||
)
|
||||
@@ -12,10 +12,12 @@ from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import TranscriptionMessage
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
@@ -69,12 +71,16 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
transcript = TranscriptProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
transcript.user(),
|
||||
llm,
|
||||
transport.output(),
|
||||
transcript.assistant(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
@@ -103,6 +109,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
# Register event handler for transcript updates
|
||||
@transcript.event_handler("on_transcript_update")
|
||||
async def on_transcript_update(processor, frame):
|
||||
for msg in frame.messages:
|
||||
if isinstance(msg, TranscriptionMessage):
|
||||
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
|
||||
line = f"{timestamp}{msg.role}: {msg.content}"
|
||||
logger.info(f"Transcript: {line}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
|
||||
@@ -47,7 +47,7 @@ canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.17.0" ]
|
||||
daily = [ "daily-python~=0.18.0" ]
|
||||
deepgram = [ "deepgram-sdk~=3.8.0" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
fal = [ "fal-client~=0.5.9" ]
|
||||
|
||||
@@ -60,12 +60,16 @@ class Frame:
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
metadata: Dict[str, Any] = field(init=False)
|
||||
transport_source: Optional[str] = field(init=False)
|
||||
transport_destination: Optional[str] = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
self.metadata: Dict[str, Any] = {}
|
||||
self.transport_source: Optional[str] = None
|
||||
self.transport_destination: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
@@ -136,8 +140,9 @@ class ImageRawFrame:
|
||||
|
||||
@dataclass
|
||||
class OutputAudioRawFrame(DataFrame, AudioRawFrame):
|
||||
"""A chunk of audio. Will be played by the output transport if the
|
||||
transport's microphone has been enabled.
|
||||
"""A chunk of audio. Will be played by the output transport. If the
|
||||
transport supports multiple audio destinations (e.g. multiple audio tracks) the
|
||||
destination name can be specified.
|
||||
|
||||
"""
|
||||
|
||||
@@ -147,13 +152,14 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame):
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputImageRawFrame(DataFrame, ImageRawFrame):
|
||||
"""An image that will be shown by the transport if the transport's camera is
|
||||
enabled.
|
||||
"""An image that will be shown by the transport. If the transport supports
|
||||
multiple video destinations (e.g. multiple video tracks) the destination
|
||||
name can be specified.
|
||||
|
||||
"""
|
||||
|
||||
@@ -176,7 +182,7 @@ class URLImageRawFrame(OutputImageRawFrame):
|
||||
|
||||
"""
|
||||
|
||||
url: Optional[str]
|
||||
url: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
@@ -689,6 +695,20 @@ class STTMuteFrame(SystemFrame):
|
||||
mute: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestSTTMuteFrame(Frame):
|
||||
"""Request to mute the STT service."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestSTTUnmuteFrame(Frame):
|
||||
"""Request to unmute the STT service."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageUrgentFrame(SystemFrame):
|
||||
message: Any
|
||||
@@ -716,7 +736,11 @@ class UserImageRequestFrame(SystemFrame):
|
||||
|
||||
@dataclass
|
||||
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
"""A chunk of audio usually coming from an input transport."""
|
||||
"""A chunk of audio usually coming from an input transport. If the transport
|
||||
supports multiple audio sources (e.g. multiple audio tracks) the source name
|
||||
will be specified.
|
||||
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
@@ -724,35 +748,50 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InputImageRawFrame(SystemFrame, ImageRawFrame):
|
||||
"""An image usually coming from an input transport."""
|
||||
"""An image usually coming from an input transport. If the transport
|
||||
supports multiple video sources (e.g. multiple video tracks) the source name
|
||||
will be specified.
|
||||
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
|
||||
return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserAudioRawFrame(InputAudioRawFrame):
|
||||
"""A chunk of audio, usually coming from an input transport, associated to a user."""
|
||||
|
||||
user_id: str = ""
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserImageRawFrame(InputImageRawFrame):
|
||||
"""An image associated to a user."""
|
||||
|
||||
user_id: str
|
||||
user_id: str = ""
|
||||
request: Optional[UserImageRequestFrame] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format}, request: {self.request})"
|
||||
return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VisionImageRawFrame(InputImageRawFrame):
|
||||
"""An image with an associated text to ask for a description of it."""
|
||||
|
||||
text: Optional[str]
|
||||
text: Optional[str] = None
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
|
||||
@@ -25,6 +25,8 @@ from pipecat.frames.frames import (
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
RequestSTTMuteFrame,
|
||||
RequestSTTUnmuteFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -101,6 +103,7 @@ class STTMuteFilter(FrameProcessor):
|
||||
self._bot_is_speaking = False
|
||||
self._function_call_in_progress = False
|
||||
self._is_muted = False # Initialize as unmuted, will set state on StartFrame if needed
|
||||
self._frame_requested_mute = False
|
||||
|
||||
@property
|
||||
def is_muted(self) -> bool:
|
||||
@@ -116,6 +119,10 @@ class STTMuteFilter(FrameProcessor):
|
||||
|
||||
async def _should_mute(self) -> bool:
|
||||
"""Determines if STT should be muted based on current state and strategy."""
|
||||
# First check if a RequestSTTMuteFrame was received
|
||||
if self._frame_requested_mute:
|
||||
return True
|
||||
|
||||
for strategy in self._config.strategies:
|
||||
match strategy:
|
||||
case STTMuteStrategy.FUNCTION_CALL:
|
||||
@@ -151,7 +158,13 @@ class STTMuteFilter(FrameProcessor):
|
||||
should_mute = None
|
||||
|
||||
# Process frames to determine mute state
|
||||
if isinstance(frame, StartFrame):
|
||||
if isinstance(frame, RequestSTTMuteFrame):
|
||||
self._frame_requested_mute = True
|
||||
should_mute = await self._should_mute()
|
||||
elif isinstance(frame, RequestSTTUnmuteFrame):
|
||||
self._frame_requested_mute = False
|
||||
should_mute = await self._should_mute()
|
||||
elif isinstance(frame, StartFrame):
|
||||
should_mute = await self._should_mute()
|
||||
elif isinstance(frame, FunctionCallInProgressFrame):
|
||||
self._function_call_in_progress = True
|
||||
|
||||
@@ -227,10 +227,8 @@ class GeminiMultimodalLiveAssistantContextAggregator(OpenAIAssistantContextAggre
|
||||
# but the GeminiMultimodalLiveAssistantContextAggregator pushes LLMTextFrames and TTSTextFrames. We
|
||||
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
|
||||
# are process. This ensures that the context gets only one set of messages.
|
||||
# GeminiMultimodalLiveLLMService also pushes TranscriptionFrames, so we need to
|
||||
# ignore pushing those as well, as they're also TextFrames.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame)):
|
||||
if not isinstance(frame, LLMTextFrame):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_user_image_frame(self, frame: UserImageRawFrame):
|
||||
@@ -354,6 +352,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_is_speaking = False
|
||||
self._user_audio_buffer = bytearray()
|
||||
self._bot_audio_buffer = bytearray()
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
self._sample_rate = 24000
|
||||
|
||||
@@ -464,9 +463,9 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
# Sometimes the transcription contains newlines; we want to remove them.
|
||||
cleaned_text = text.rstrip("\n")
|
||||
logger.debug(f"[Transcription:user] {cleaned_text}")
|
||||
context.add_message({"role": "user", "content": [{"type": "text", "text": cleaned_text}]})
|
||||
await self.push_frame(
|
||||
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601())
|
||||
TranscriptionFrame(text=cleaned_text, user_id="user", timestamp=time_now_iso8601()),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
|
||||
async def _transcribe_audio(self, audio, context):
|
||||
@@ -852,6 +851,15 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
if not part:
|
||||
return
|
||||
|
||||
# part.text is added when `modalities` is set to TEXT; otherwise, it's None
|
||||
text = part.text
|
||||
if text:
|
||||
if not self._bot_text_buffer:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
|
||||
self._bot_text_buffer += text
|
||||
await self.push_frame(LLMTextFrame(text=text))
|
||||
|
||||
inline_data = part.inlineData
|
||||
if not inline_data:
|
||||
return
|
||||
@@ -892,13 +900,24 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
|
||||
async def _handle_evt_turn_complete(self, evt):
|
||||
self._bot_is_speaking = False
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
text = self._bot_text_buffer
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
# Only push the TTSStoppedFrame the bot is outputting audio
|
||||
# when text is found, modalities is set to TEXT and no audio
|
||||
# is produced.
|
||||
if not text:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
async def _handle_evt_output_transcription(self, evt):
|
||||
if not evt.serverContent.outputTranscription:
|
||||
return
|
||||
|
||||
# This is the output transcription text when modalities is set to AUDIO.
|
||||
# In this case, we push LLMTextFrame and TTSTextFrame to be handled by the
|
||||
# downstream assistant context aggregator.
|
||||
text = evt.serverContent.outputTranscription.text
|
||||
|
||||
if not text:
|
||||
|
||||
@@ -12,9 +12,11 @@ from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMSetToolsFrame,
|
||||
LLMTextFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -137,15 +139,6 @@ class OpenAIRealtimeLLMContext(OpenAILLMContext):
|
||||
}
|
||||
self.add_message(message)
|
||||
|
||||
def add_assistant_content_item_as_message(self, item):
|
||||
message = {"role": "assistant", "content": []}
|
||||
for content in item.content:
|
||||
if content.type == "audio":
|
||||
message["content"].append({"type": "text", "text": content.transcript})
|
||||
else:
|
||||
logger.error(f"Unhandled content type in assistant item: {content.type} - {item}")
|
||||
self.add_message(message)
|
||||
|
||||
|
||||
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
|
||||
async def process_frame(
|
||||
@@ -175,8 +168,10 @@ class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator)
|
||||
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
|
||||
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
|
||||
# are process. This ensures that the context gets only one set of messages.
|
||||
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
|
||||
# so we need to ignore pushing those as well, as they're also TextFrames.
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if not isinstance(frame, LLMTextFrame):
|
||||
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
|
||||
|
||||
@@ -562,13 +562,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True))
|
||||
|
||||
async def _handle_assistant_output(self, output):
|
||||
# logger.debug(f"!!! HANDLE Assistant output: {output}")
|
||||
# We haven't seen intermixed audio and function_call items in the same response. But let's
|
||||
# try to write logic that handles that, if it does happen.
|
||||
messages = [item for item in output if item.type == "message"]
|
||||
# Also, the assistant output is pushed as LLMTextFrame and TTSTextFrame to be handled by
|
||||
# the assistant context aggregator.
|
||||
function_calls = [item for item in output if item.type == "function_call"]
|
||||
for item in messages:
|
||||
self._context.add_assistant_content_item_as_message(item)
|
||||
await self._handle_function_call_items(function_calls)
|
||||
|
||||
async def _handle_function_call_items(self, items):
|
||||
|
||||
@@ -79,8 +79,8 @@ class FastPitchTTSService(TTSService):
|
||||
self._voice_id,
|
||||
self._language_code,
|
||||
sample_rate_hz=self.sample_rate,
|
||||
audio_prompt_file=None,
|
||||
quality=self._quality,
|
||||
zero_shot_audio_prompt_file=None,
|
||||
zero_shot_quality=self._quality,
|
||||
custom_dictionary={},
|
||||
)
|
||||
for r in responses:
|
||||
|
||||
@@ -66,6 +66,8 @@ class TTSService(AIService):
|
||||
# Text filter executed after text has been aggregated.
|
||||
text_filters: Sequence[BaseTextFilter] = [],
|
||||
text_filter: Optional[BaseTextFilter] = None,
|
||||
# Audio transport destination of the generated frames.
|
||||
transport_destination: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
@@ -82,6 +84,8 @@ class TTSService(AIService):
|
||||
self._settings: Dict[str, Any] = {}
|
||||
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
|
||||
self._text_filters: Sequence[BaseTextFilter] = text_filters
|
||||
self._transport_destination: Optional[str] = transport_destination
|
||||
|
||||
if text_filter:
|
||||
import warnings
|
||||
|
||||
@@ -207,13 +211,16 @@ class TTSService(AIService):
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame):
|
||||
silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=b"\x00" * silence_num_bytes,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
silence_frame = TTSAudioRawFrame(
|
||||
audio=b"\x00" * silence_num_bytes,
|
||||
sample_rate=self.sample_rate,
|
||||
num_channels=1,
|
||||
)
|
||||
silence_frame.transport_destination = self._transport_destination
|
||||
await self.push_frame(silence_frame)
|
||||
|
||||
if isinstance(frame, (TTSStartedFrame, TTSStoppedFrame, TTSAudioRawFrame, TTSTextFrame)):
|
||||
frame.transport_destination = self._transport_destination
|
||||
|
||||
await super().push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
)
|
||||
self._params.audio_in_passthrough = True
|
||||
|
||||
if self._params.camera_in_enabled or self._params.camera_out_enabled:
|
||||
if self._params.camera_in_enabled:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
|
||||
@@ -8,11 +8,12 @@ import asyncio
|
||||
import itertools
|
||||
import sys
|
||||
import time
|
||||
from typing import AsyncGenerator, List
|
||||
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
|
||||
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
@@ -46,35 +47,28 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
# Task to process incoming frames so we don't block upstream elements.
|
||||
self._sink_task = None
|
||||
|
||||
# Task to process incoming frames using a clock.
|
||||
self._sink_clock_task = None
|
||||
|
||||
# Task to write/send audio and image frames.
|
||||
self._video_out_task = None
|
||||
|
||||
# These are the images that we should send at our desired framerate.
|
||||
self._video_images = None
|
||||
|
||||
# Output sample rate. It will be initialized on StartFrame.
|
||||
self._sample_rate = 0
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
# Chunk size that will be written. It will be computed on StartFrame
|
||||
# We write 10ms*CHUNKS of audio at a time (where CHUNKS is the
|
||||
# `audio_out_10ms_chunks` parameter). If we receive long audio frames we
|
||||
# will chunk them. This helps with interruption handling. It will be
|
||||
# initialized on StartFrame.
|
||||
self._audio_chunk_size = 0
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
# We will have one media sender per output frame destination. This allow
|
||||
# us to send multiple streams at the same time if the transport allows
|
||||
# it.
|
||||
self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {}
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def audio_chunk_size(self) -> int:
|
||||
return self._audio_chunk_size
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
|
||||
|
||||
@@ -84,42 +78,63 @@ class BaseOutputTransport(FrameProcessor):
|
||||
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
|
||||
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
|
||||
|
||||
# Start audio mixer.
|
||||
if self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.start(self._sample_rate)
|
||||
self._create_video_task()
|
||||
self._create_sink_tasks()
|
||||
# Register destinations.
|
||||
for destination in self._params.audio_out_destinations:
|
||||
await self.register_audio_destination(destination)
|
||||
|
||||
for destination in self._params.video_out_destinations:
|
||||
await self.register_video_destination(destination)
|
||||
|
||||
# Start default media sender.
|
||||
self._media_senders[None] = BaseOutputTransport.MediaSender(
|
||||
self,
|
||||
destination=None,
|
||||
sample_rate=self.sample_rate,
|
||||
audio_chunk_size=self.audio_chunk_size,
|
||||
params=self._params,
|
||||
)
|
||||
await self._media_senders[None].start(frame)
|
||||
|
||||
# Media senders already send both audio and video, so make sure we only
|
||||
# have one media server per shared name.
|
||||
destinations = list(
|
||||
set(self._params.audio_out_destinations + self._params.video_out_destinations)
|
||||
)
|
||||
|
||||
# Start media senders.
|
||||
for destination in destinations:
|
||||
self._media_senders[destination] = BaseOutputTransport.MediaSender(
|
||||
self,
|
||||
destination=destination,
|
||||
sample_rate=self.sample_rate,
|
||||
audio_chunk_size=self.audio_chunk_size,
|
||||
params=self._params,
|
||||
)
|
||||
await self._media_senders[destination].start(frame)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the sink tasks. We also need to wait
|
||||
# for these tasks before cancelling the video and audio tasks below
|
||||
# because they might be still rendering.
|
||||
if self._sink_task:
|
||||
await self.wait_for_task(self._sink_task)
|
||||
if self._sink_clock_task:
|
||||
await self.wait_for_task(self._sink_clock_task)
|
||||
|
||||
# We can now cancel the video task.
|
||||
await self._cancel_video_task()
|
||||
for _, sender in self._media_senders.items():
|
||||
await sender.stop(frame)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter if we cancel sink
|
||||
# tasks first or not.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_video_task()
|
||||
for _, sender in self._media_senders.items():
|
||||
await sender.cancel(frame)
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
pass
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def register_video_destination(self, destination: str):
|
||||
pass
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def register_audio_destination(self, destination: str):
|
||||
pass
|
||||
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
pass
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
pass
|
||||
|
||||
async def send_audio(self, frame: OutputAudioRawFrame):
|
||||
@@ -150,7 +165,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
|
||||
await self.push_frame(frame, direction)
|
||||
await self._handle_interruptions(frame)
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, TransportMessageUrgentFrame):
|
||||
await self.send_message(frame)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
@@ -160,117 +175,416 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.stop(frame)
|
||||
# Keep pushing EndFrame down so all the pipeline stops nicely.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
|
||||
await self._params.audio_out_mixer.process_frame(frame)
|
||||
elif isinstance(frame, MixerControlFrame):
|
||||
await self._handle_frame(frame)
|
||||
# Other frames.
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
await self._handle_audio(frame)
|
||||
await self._handle_frame(frame)
|
||||
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
|
||||
await self._handle_image(frame)
|
||||
await self._handle_frame(frame)
|
||||
# TODO(aleix): Images and audio should support presentation timestamps.
|
||||
elif frame.pts:
|
||||
await self._sink_clock_queue.put((frame.pts, frame.id, frame))
|
||||
await self._handle_frame(frame)
|
||||
elif direction == FrameDirection.UPSTREAM:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
await self._handle_frame(frame)
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
if not self.interruptions_allowed:
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
if frame.transport_destination not in self._media_senders:
|
||||
logger.warning(
|
||||
f"{self} destination [{frame.transport_destination}] not registered for frame {frame}"
|
||||
)
|
||||
return
|
||||
|
||||
sender = self._media_senders[frame.transport_destination]
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
# Cancel sink and video tasks.
|
||||
await self._cancel_sink_tasks()
|
||||
await self._cancel_video_task()
|
||||
# Create sink and video tasks.
|
||||
await sender.handle_interruptions(frame)
|
||||
elif isinstance(frame, OutputAudioRawFrame):
|
||||
await sender.handle_audio_frame(frame)
|
||||
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
|
||||
await sender.handle_image_frame(frame)
|
||||
elif isinstance(frame, MixerControlFrame):
|
||||
await sender.handle_mixer_control_frame(frame)
|
||||
elif frame.pts:
|
||||
await sender.handle_timed_frame(frame)
|
||||
else:
|
||||
await sender.handle_sync_frame(frame)
|
||||
|
||||
#
|
||||
# Media Sender
|
||||
#
|
||||
|
||||
class MediaSender:
|
||||
def __init__(
|
||||
self,
|
||||
transport: "BaseOutputTransport",
|
||||
*,
|
||||
destination: Optional[str],
|
||||
sample_rate: int,
|
||||
audio_chunk_size: int,
|
||||
params: TransportParams,
|
||||
):
|
||||
self._transport = transport
|
||||
self._destination = destination
|
||||
self._sample_rate = sample_rate
|
||||
self._audio_chunk_size = audio_chunk_size
|
||||
self._params = params
|
||||
|
||||
# Buffer to keep track of incoming audio.
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
# This will be used to resample incoming audio to the output sample rate.
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
# The user can provide a single mixer, to be used by the default
|
||||
# destination, or a destination/mixer mapping.
|
||||
self._mixer: Optional[BaseAudioMixer] = None
|
||||
|
||||
# These are the images that we should send at our desired framerate.
|
||||
self._video_images = None
|
||||
|
||||
# Indicates if the bot is currently speaking.
|
||||
self._bot_speaking = False
|
||||
|
||||
self._audio_task: Optional[asyncio.Task] = None
|
||||
self._video_task: Optional[asyncio.Task] = None
|
||||
self._clock_task: Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def audio_chunk_size(self) -> int:
|
||||
return self._audio_chunk_size
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
# Create all tasks.
|
||||
self._create_video_task()
|
||||
self._create_sink_tasks()
|
||||
self._create_clock_task()
|
||||
self._create_audio_task()
|
||||
|
||||
# Check if we have an audio mixer for our destination.
|
||||
if self._params.audio_out_mixer:
|
||||
if isinstance(self._params.audio_out_mixer, Mapping):
|
||||
self._mixer = self._params.audio_out_mixer.get(self._destination, None)
|
||||
elif not self._destination:
|
||||
# Only use the default mixer if we are the default destination.
|
||||
self._mixer = self._params.audio_out_mixer
|
||||
|
||||
# Start audio mixer.
|
||||
if self._mixer:
|
||||
await self._mixer.start(self._sample_rate)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Let the sink tasks process the queue until they reach this EndFrame.
|
||||
await self._clock_queue.put((sys.maxsize, frame.id, frame))
|
||||
await self._audio_queue.put(frame)
|
||||
|
||||
# At this point we have enqueued an EndFrame and we need to wait for
|
||||
# that EndFrame to be processed by the audio and clock tasks. We
|
||||
# also need to wait for these tasks before cancelling the video task
|
||||
# because it might be still rendering.
|
||||
if self._audio_task:
|
||||
await self._transport.wait_for_task(self._audio_task)
|
||||
if self._clock_task:
|
||||
await self._transport.wait_for_task(self._clock_task)
|
||||
|
||||
# Stop audio mixer.
|
||||
if self._mixer:
|
||||
await self._mixer.stop()
|
||||
|
||||
# We can now cancel the video task.
|
||||
await self._cancel_video_task()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Since we are cancelling everything it doesn't matter what task we cancel first.
|
||||
await self._cancel_audio_task()
|
||||
await self._cancel_clock_task()
|
||||
await self._cancel_video_task()
|
||||
|
||||
async def handle_interruptions(self, _: StartInterruptionFrame):
|
||||
if not self._transport.interruptions_allowed:
|
||||
return
|
||||
|
||||
# Cancel tasks.
|
||||
await self._cancel_audio_task()
|
||||
await self._cancel_clock_task()
|
||||
await self._cancel_video_task()
|
||||
# Create tasks.
|
||||
self._create_video_task()
|
||||
self._create_clock_task()
|
||||
self._create_audio_task()
|
||||
# Let's send a bot stopped speaking if we have to.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def _handle_audio(self, frame: OutputAudioRawFrame):
|
||||
if not self._params.audio_out_enabled:
|
||||
return
|
||||
async def handle_audio_frame(self, frame: OutputAudioRawFrame):
|
||||
if not self._params.audio_out_enabled:
|
||||
return
|
||||
|
||||
# We might need to resample if incoming audio doesn't match the
|
||||
# transport sample rate.
|
||||
resampled = await self._resampler.resample(
|
||||
frame.audio, frame.sample_rate, self._sample_rate
|
||||
)
|
||||
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(resampled)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
# We might need to resample if incoming audio doesn't match the
|
||||
# transport sample rate.
|
||||
resampled = await self._resampler.resample(
|
||||
frame.audio, frame.sample_rate, self._sample_rate
|
||||
)
|
||||
await self._sink_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
if not self._params.video_out_enabled:
|
||||
return
|
||||
cls = type(frame)
|
||||
self._audio_buffer.extend(resampled)
|
||||
while len(self._audio_buffer) >= self._audio_chunk_size:
|
||||
chunk = cls(
|
||||
bytes(self._audio_buffer[: self._audio_chunk_size]),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
await self._audio_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_out_queue.put(frame)
|
||||
else:
|
||||
await self._sink_queue.put(frame)
|
||||
async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame):
|
||||
if not self._params.video_out_enabled:
|
||||
return
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
if not self._bot_speaking:
|
||||
logger.debug("Bot started speaking")
|
||||
await self.push_frame(BotStartedSpeakingFrame())
|
||||
await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = True
|
||||
if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame):
|
||||
await self._video_queue.put(frame)
|
||||
elif isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
else:
|
||||
await self._set_video_images(frame.images)
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
if self._bot_speaking:
|
||||
logger.debug("Bot stopped speaking")
|
||||
await self.push_frame(BotStoppedSpeakingFrame())
|
||||
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = False
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
async def handle_timed_frame(self, frame: Frame):
|
||||
await self._clock_queue.put((frame.pts, frame.id, frame))
|
||||
|
||||
#
|
||||
# Sink tasks
|
||||
#
|
||||
async def handle_sync_frame(self, frame: Frame):
|
||||
await self._audio_queue.put(frame)
|
||||
|
||||
def _create_sink_tasks(self):
|
||||
if not self._sink_task:
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = self.create_task(self._sink_task_handler())
|
||||
if not self._sink_clock_task:
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
|
||||
async def handle_mixer_control_frame(self, frame: MixerControlFrame):
|
||||
if self._mixer:
|
||||
await self._mixer.process_frame(frame)
|
||||
|
||||
async def _cancel_sink_tasks(self):
|
||||
# Stop sink tasks.
|
||||
if self._sink_task:
|
||||
await self.cancel_task(self._sink_task)
|
||||
self._sink_task = None
|
||||
# Stop sink clock tasks.
|
||||
if self._sink_clock_task:
|
||||
await self.cancel_task(self._sink_clock_task)
|
||||
self._sink_clock_task = None
|
||||
#
|
||||
# Audio handling
|
||||
#
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
def _create_audio_task(self):
|
||||
if not self._audio_task and self._params.audio_out_enabled:
|
||||
self._audio_queue = asyncio.Queue()
|
||||
self._audio_task = self._transport.create_task(self._audio_task_handler())
|
||||
|
||||
async def _sink_clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
timestamp, _, frame = await self._sink_clock_queue.get()
|
||||
async def _cancel_audio_task(self):
|
||||
if self._audio_task:
|
||||
await self._transport.cancel_task(self._audio_task)
|
||||
self._audio_task = None
|
||||
|
||||
async def _bot_started_speaking(self):
|
||||
if not self._bot_speaking:
|
||||
logger.debug(f"Bot [{self._destination}] started speaking")
|
||||
|
||||
downstream_frame = BotStartedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStartedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = True
|
||||
|
||||
async def _bot_stopped_speaking(self):
|
||||
if self._bot_speaking:
|
||||
logger.debug(f"Bot [{self._destination}] stopped speaking")
|
||||
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = False
|
||||
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
if isinstance(frame, OutputImageRawFrame):
|
||||
await self._set_video_image(frame)
|
||||
elif isinstance(frame, SpriteFrame):
|
||||
await self._set_video_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self._transport.send_message(frame)
|
||||
|
||||
def _next_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(
|
||||
self._audio_queue.get(), timeout=vad_stop_secs
|
||||
)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
while True:
|
||||
try:
|
||||
frame = self._audio_queue.get_nowait()
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
frame.audio = await self._mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._mixer.mix(silence),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
if self._mixer:
|
||||
return with_mixer(BOT_VAD_STOP_SECS)
|
||||
else:
|
||||
return without_mixer(BOT_VAD_STOP_SECS)
|
||||
|
||||
async def _audio_task_handler(self):
|
||||
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
|
||||
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
|
||||
# every audio chunk.
|
||||
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
|
||||
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
|
||||
bot_speaking_counter = 0
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
|
||||
await self._transport.push_frame(BotSpeakingFrame())
|
||||
await self._transport.push_frame(
|
||||
BotSpeakingFrame(), FrameDirection.UPSTREAM
|
||||
)
|
||||
bot_speaking_counter = 0
|
||||
bot_speaking_counter += 1
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
await self._handle_frame(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self._transport.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self._transport.write_raw_audio_frames(frame.audio, self._destination)
|
||||
|
||||
#
|
||||
# Video handling
|
||||
#
|
||||
|
||||
def _create_video_task(self):
|
||||
if not self._video_task and self._params.video_out_enabled:
|
||||
self._video_queue = asyncio.Queue()
|
||||
self._video_task = self._transport.create_task(self._video_task_handler())
|
||||
|
||||
async def _cancel_video_task(self):
|
||||
# Stop video output task.
|
||||
if self._video_task:
|
||||
await self._transport.cancel_task(self._video_task)
|
||||
self._video_task = None
|
||||
|
||||
async def _set_video_image(self, image: OutputImageRawFrame):
|
||||
self._video_images = itertools.cycle([image])
|
||||
|
||||
async def _set_video_images(self, images: List[OutputImageRawFrame]):
|
||||
self._video_images = itertools.cycle(images)
|
||||
|
||||
async def _video_task_handler(self):
|
||||
self._video_start_time = None
|
||||
self._video_frame_index = 0
|
||||
self._video_frame_duration = 1 / self._params.video_out_framerate
|
||||
self._video_frame_reset = self._video_frame_duration * 5
|
||||
while True:
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_is_live_handler()
|
||||
elif self._video_images:
|
||||
image = next(self._video_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._video_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._video_frame_duration)
|
||||
|
||||
async def _video_is_live_handler(self):
|
||||
image = await self._video_queue.get()
|
||||
|
||||
# We get the start time as soon as we get the first image.
|
||||
if not self._video_start_time:
|
||||
self._video_start_time = time.time()
|
||||
self._video_frame_index = 0
|
||||
|
||||
# Calculate how much time we need to wait before rendering next image.
|
||||
real_elapsed_time = time.time() - self._video_start_time
|
||||
real_render_time = self._video_frame_index * self._video_frame_duration
|
||||
delay_time = self._video_frame_duration + real_render_time - real_elapsed_time
|
||||
|
||||
if abs(delay_time) > self._video_frame_reset:
|
||||
self._video_start_time = time.time()
|
||||
self._video_frame_index = 0
|
||||
elif delay_time > 0:
|
||||
await asyncio.sleep(delay_time)
|
||||
self._video_frame_index += 1
|
||||
|
||||
# Render image
|
||||
await self._draw_image(image)
|
||||
|
||||
self._video_queue.task_done()
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.video_out_width, self._params.video_out_height)
|
||||
|
||||
# TODO: we should refactor in the future to support dynamic resolutions
|
||||
# which is kind of what happens in P2P connections.
|
||||
# We need to add support for that inside the DailyTransport
|
||||
if frame.size != desired_size:
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
resized_image = image.resize(desired_size)
|
||||
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = OutputImageRawFrame(
|
||||
resized_image.tobytes(), resized_image.size, resized_image.format
|
||||
)
|
||||
|
||||
await self._transport.write_raw_video_frame(frame, self._destination)
|
||||
|
||||
#
|
||||
# Clock handling
|
||||
#
|
||||
|
||||
def _create_clock_task(self):
|
||||
if not self._clock_task:
|
||||
self._clock_queue = asyncio.PriorityQueue()
|
||||
self._clock_task = self._transport.create_task(self._clock_task_handler())
|
||||
|
||||
async def _cancel_clock_task(self):
|
||||
if self._clock_task:
|
||||
await self._transport.cancel_task(self._clock_task)
|
||||
self._clock_task = None
|
||||
|
||||
async def _clock_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
timestamp, _, frame = await self._clock_queue.get()
|
||||
|
||||
# If we hit an EndFrame, we can finish right away.
|
||||
running = not isinstance(frame, EndFrame)
|
||||
@@ -279,167 +593,12 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# has already passed we process it, otherwise we wait until it's
|
||||
# time to process it.
|
||||
if running:
|
||||
current_time = self.get_clock().get_time()
|
||||
current_time = self._transport.get_clock().get_time()
|
||||
if timestamp > current_time:
|
||||
wait_time = nanoseconds_to_seconds(timestamp - current_time)
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
# Push frame downstream.
|
||||
await self._transport.push_frame(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
self._sink_clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
def _next_frame(self) -> AsyncGenerator[Frame, None]:
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
last_frame_time = 0
|
||||
silence = b"\x00" * self._audio_chunk_size
|
||||
while True:
|
||||
try:
|
||||
frame = self._sink_queue.get_nowait()
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
|
||||
last_frame_time = time.time()
|
||||
yield frame
|
||||
except asyncio.QueueEmpty:
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
diff_time = time.time() - last_frame_time
|
||||
if diff_time > vad_stop_secs:
|
||||
await self._bot_stopped_speaking()
|
||||
# Generate an audio frame with only the mixer's part.
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=await self._params.audio_out_mixer.mix(silence),
|
||||
sample_rate=self._sample_rate,
|
||||
num_channels=self._params.audio_out_channels,
|
||||
)
|
||||
yield frame
|
||||
|
||||
if self._params.audio_out_mixer:
|
||||
return with_mixer(BOT_VAD_STOP_SECS)
|
||||
else:
|
||||
return without_mixer(BOT_VAD_STOP_SECS)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
# Push a BotSpeakingFrame every 200ms, we don't really need to push it
|
||||
# at every audio chunk. If the audio chunk is bigger than 200ms, push at
|
||||
# every audio chunk.
|
||||
TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
|
||||
BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
|
||||
bot_speaking_counter = 0
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
bot_speaking_counter = 0
|
||||
bot_speaking_counter += 1
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
|
||||
#
|
||||
# Video task
|
||||
#
|
||||
|
||||
def _create_video_task(self):
|
||||
# Create video output queue and task if needed.
|
||||
if not self._video_out_task and self._params.video_out_enabled:
|
||||
self._video_out_queue = asyncio.Queue()
|
||||
self._video_out_task = self.create_task(self._video_out_task_handler())
|
||||
|
||||
async def _cancel_video_task(self):
|
||||
# Stop video output task.
|
||||
if self._video_out_task and self._params.video_out_enabled:
|
||||
await self.cancel_task(self._video_out_task)
|
||||
self._video_out_task = None
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.video_out_width, self._params.video_out_height)
|
||||
|
||||
# TODO: we should refactor in the future to support dynamic resolutions
|
||||
# which is kind of what happens in P2P connections.
|
||||
# We need to add support for that inside the DailyTransport
|
||||
if frame.size != desired_size:
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
resized_image = image.resize(desired_size)
|
||||
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = OutputImageRawFrame(
|
||||
resized_image.tobytes(), resized_image.size, resized_image.format
|
||||
)
|
||||
|
||||
await self.write_raw_video_frame(frame)
|
||||
|
||||
async def _set_video_image(self, image: OutputImageRawFrame):
|
||||
self._video_images = itertools.cycle([image])
|
||||
|
||||
async def _set_video_images(self, images: List[OutputImageRawFrame]):
|
||||
self._video_images = itertools.cycle(images)
|
||||
|
||||
async def _video_out_task_handler(self):
|
||||
self._video_out_start_time = None
|
||||
self._video_out_frame_index = 0
|
||||
self._video_out_frame_duration = 1 / self._params.video_out_framerate
|
||||
self._video_out_frame_reset = self._video_out_frame_duration * 5
|
||||
while True:
|
||||
if self._params.video_out_is_live:
|
||||
await self._video_out_is_live_handler()
|
||||
elif self._video_images:
|
||||
image = next(self._video_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._video_out_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._video_out_frame_duration)
|
||||
|
||||
async def _video_out_is_live_handler(self):
|
||||
image = await self._video_out_queue.get()
|
||||
|
||||
# We get the start time as soon as we get the first image.
|
||||
if not self._video_out_start_time:
|
||||
self._video_out_start_time = time.time()
|
||||
self._video_out_frame_index = 0
|
||||
|
||||
# Calculate how much time we need to wait before rendering next image.
|
||||
real_elapsed_time = time.time() - self._video_out_start_time
|
||||
real_render_time = self._video_out_frame_index * self._video_out_frame_duration
|
||||
delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
|
||||
|
||||
if abs(delay_time) > self._video_out_frame_reset:
|
||||
self._video_out_start_time = time.time()
|
||||
self._video_out_frame_index = 0
|
||||
elif delay_time > 0:
|
||||
await asyncio.sleep(delay_time)
|
||||
self._video_out_frame_index += 1
|
||||
|
||||
# Render image
|
||||
await self._draw_image(image)
|
||||
|
||||
self._video_out_queue.task_done()
|
||||
self._clock_queue.task_done()
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
from abc import abstractmethod
|
||||
from typing import Optional
|
||||
from typing import List, Mapping, Optional
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
@@ -33,7 +33,8 @@ class TransportParams(BaseModel):
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
audio_out_10ms_chunks: int = 4
|
||||
audio_out_mixer: Optional[BaseAudioMixer] = None
|
||||
audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
|
||||
audio_out_destinations: List[str] = []
|
||||
audio_in_enabled: bool = False
|
||||
audio_in_sample_rate: Optional[int] = None
|
||||
audio_in_channels: int = 1
|
||||
@@ -48,6 +49,7 @@ class TransportParams(BaseModel):
|
||||
video_out_bitrate: int = 800000
|
||||
video_out_framerate: int = 30
|
||||
video_out_color_format: str = "RGB"
|
||||
video_out_destinations: List[str] = []
|
||||
vad_enabled: bool = False
|
||||
vad_audio_passthrough: bool = False
|
||||
vad_analyzer: Optional[VADAnalyzer] = None
|
||||
|
||||
@@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
|
||||
self._out_stream.close()
|
||||
self._out_stream = None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
|
||||
@@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
self._out_stream.close()
|
||||
self._out_stream = None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._out_stream:
|
||||
await self.get_event_loop().run_in_executor(
|
||||
self._executor, self._out_stream.write, frames
|
||||
)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
|
||||
def _write_frame_to_tk(self, frame: OutputImageRawFrame):
|
||||
|
||||
@@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
await super().start(frame)
|
||||
await self._client.setup(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if self._client.is_closing:
|
||||
return
|
||||
|
||||
|
||||
@@ -284,11 +284,13 @@ class SmallWebRTCClient:
|
||||
)
|
||||
yield audio_frame
|
||||
|
||||
async def write_raw_audio_frames(self, data: bytes):
|
||||
async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None):
|
||||
if self._can_send() and self._audio_output_track:
|
||||
await self._audio_output_track.add_audio_bytes(data)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
if self._can_send() and self._video_output_track:
|
||||
self._video_output_track.add_video_frame(frame)
|
||||
|
||||
@@ -497,10 +499,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
await self._client.write_raw_video_frame(frame)
|
||||
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
await self._params.serializer.setup(frame)
|
||||
await self._session.setup(frame)
|
||||
await self._session.connect()
|
||||
@@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
frame = OutputAudioRawFrame(
|
||||
audio=frames,
|
||||
sample_rate=self.sample_rate,
|
||||
|
||||
@@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
if not self._websocket:
|
||||
# Simulate audio playback with a sleep.
|
||||
await self._write_audio_sleep()
|
||||
|
||||
@@ -8,10 +8,13 @@ import asyncio
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Awaitable, Callable, Mapping, Optional
|
||||
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
|
||||
|
||||
import aiohttp
|
||||
from daily import (
|
||||
AudioData,
|
||||
CustomAudioSource,
|
||||
VideoFrame,
|
||||
VirtualCameraDevice,
|
||||
VirtualMicrophoneDevice,
|
||||
VirtualSpeakerDevice,
|
||||
@@ -19,6 +22,7 @@ from daily import (
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.utils import create_default_resampler
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
@@ -34,6 +38,7 @@ from pipecat.frames.frames import (
|
||||
TranscriptionFrame,
|
||||
TransportMessageFrame,
|
||||
TransportMessageUrgentFrame,
|
||||
UserAudioRawFrame,
|
||||
UserImageRawFrame,
|
||||
UserImageRequestFrame,
|
||||
)
|
||||
@@ -149,6 +154,8 @@ class DailyParams(TransportParams):
|
||||
api_url: Daily API base URL
|
||||
api_key: Daily API authentication key
|
||||
dialin_settings: Optional settings for dial-in functionality
|
||||
camera_out_enabled: Whether to enable the main camera output track. If enabled, it still needs `video_out_enabled=True`
|
||||
microphone_out_enabled: Whether to enable the main microphone track. If enabled, it still needs `audio_out_enabled=True`
|
||||
transcription_enabled: Whether to enable speech transcription
|
||||
transcription_settings: Configuration for transcription service
|
||||
"""
|
||||
@@ -156,6 +163,8 @@ class DailyParams(TransportParams):
|
||||
api_url: str = "https://api.daily.co/v1"
|
||||
api_key: str = ""
|
||||
dialin_settings: Optional[DailyDialinSettings] = None
|
||||
camera_out_enabled: bool = True
|
||||
microphone_out_enabled: bool = True
|
||||
transcription_enabled: bool = False
|
||||
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
|
||||
|
||||
@@ -275,6 +284,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._transport_name = transport_name
|
||||
|
||||
self._participant_id: str = ""
|
||||
self._audio_renderers = {}
|
||||
self._video_renderers = {}
|
||||
self._transcription_ids = []
|
||||
self._transcription_status = None
|
||||
@@ -310,6 +320,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._camera: Optional[VirtualCameraDevice] = None
|
||||
self._mic: Optional[VirtualMicrophoneDevice] = None
|
||||
self._speaker: Optional[VirtualSpeakerDevice] = None
|
||||
self._audio_sources: Dict[str, CustomAudioSource] = {}
|
||||
|
||||
def _camera_name(self):
|
||||
return f"camera-{self}"
|
||||
@@ -328,6 +339,14 @@ class DailyTransportClient(EventHandler):
|
||||
def participant_id(self) -> str:
|
||||
return self._participant_id
|
||||
|
||||
@property
|
||||
def in_sample_rate(self) -> int:
|
||||
return self._in_sample_rate
|
||||
|
||||
@property
|
||||
def out_sample_rate(self) -> int:
|
||||
return self._out_sample_rate
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
if not self._joined:
|
||||
return
|
||||
@@ -365,19 +384,27 @@ class DailyTransportClient(EventHandler):
|
||||
await asyncio.sleep(0.01)
|
||||
return None
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
if not self._mic:
|
||||
return None
|
||||
async def register_audio_destination(self, destination: str):
|
||||
self._audio_sources[destination] = await self.add_custom_audio_track(destination)
|
||||
self._client.update_publishing({"customAudio": {destination: True}})
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._mic.write_frames(frames, completion=completion_callback(future))
|
||||
if not destination and self._mic:
|
||||
self._mic.write_frames(frames, completion=completion_callback(future))
|
||||
elif destination and destination in self._audio_sources:
|
||||
source = self._audio_sources[destination]
|
||||
source.write_frames(frames, completion=completion_callback(future))
|
||||
else:
|
||||
logger.warning(f"{self} unable to write audio frames to destination [{destination}]")
|
||||
future.set_result(None)
|
||||
await future
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
if not self._camera:
|
||||
return None
|
||||
|
||||
self._camera.write_frame(frame.image)
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
if not destination and self._camera:
|
||||
self._camera.write_frame(frame.image)
|
||||
|
||||
async def setup(self, frame: StartFrame):
|
||||
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
@@ -480,6 +507,9 @@ class DailyTransportClient(EventHandler):
|
||||
async def _join(self):
|
||||
future = self._get_event_loop().create_future()
|
||||
|
||||
camera_enabled = self._params.video_out_enabled and self._params.camera_out_enabled
|
||||
microphone_enabled = self._params.audio_out_enabled and self._params.microphone_out_enabled
|
||||
|
||||
self._client.join(
|
||||
self._room_url,
|
||||
self._token,
|
||||
@@ -487,13 +517,13 @@ class DailyTransportClient(EventHandler):
|
||||
client_settings={
|
||||
"inputs": {
|
||||
"camera": {
|
||||
"isEnabled": self._params.video_out_enabled,
|
||||
"isEnabled": camera_enabled,
|
||||
"settings": {
|
||||
"deviceId": self._camera_name(),
|
||||
},
|
||||
},
|
||||
"microphone": {
|
||||
"isEnabled": self._params.audio_out_enabled,
|
||||
"isEnabled": microphone_enabled,
|
||||
"settings": {
|
||||
"deviceId": self._mic_name(),
|
||||
"customConstraints": {
|
||||
@@ -648,6 +678,28 @@ class DailyTransportClient(EventHandler):
|
||||
if self._joined and self._transcription_status:
|
||||
await self.update_transcription(self._transcription_ids)
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
callback: Callable,
|
||||
audio_source: str = "microphone",
|
||||
):
|
||||
# Only enable the desired audio source subscription on this participant.
|
||||
if audio_source in ("microphone", "screenAudio"):
|
||||
media = {"media": {audio_source: "subscribed"}}
|
||||
else:
|
||||
media = {"media": {"customAudio": {audio_source: "subscribed"}}}
|
||||
|
||||
await self.update_subscriptions(participant_settings={participant_id: media})
|
||||
|
||||
self._audio_renderers[participant_id] = {audio_source: callback}
|
||||
|
||||
self._client.set_audio_renderer(
|
||||
participant_id,
|
||||
self._audio_data_received,
|
||||
audio_source=audio_source,
|
||||
)
|
||||
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
@@ -656,12 +708,15 @@ class DailyTransportClient(EventHandler):
|
||||
video_source: str = "camera",
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
# Only enable the desired video source subscription on this participant.
|
||||
await self.update_subscriptions(
|
||||
participant_settings={participant_id: {"media": {video_source: "subscribed"}}}
|
||||
)
|
||||
# Only enable the desired audio source subscription on this participant.
|
||||
if video_source in ("camera", "screenVideo"):
|
||||
media = {"media": {video_source: "subscribed"}}
|
||||
else:
|
||||
media = {"media": {"customVideo": {video_source: "subscribed"}}}
|
||||
|
||||
self._video_renderers[participant_id] = callback
|
||||
await self.update_subscriptions(participant_settings={participant_id: media})
|
||||
|
||||
self._video_renderers[participant_id] = {video_source: callback}
|
||||
|
||||
self._client.set_video_renderer(
|
||||
participant_id,
|
||||
@@ -670,6 +725,20 @@ class DailyTransportClient(EventHandler):
|
||||
color_format=color_format,
|
||||
)
|
||||
|
||||
async def add_custom_audio_track(self, track_name: str) -> CustomAudioSource:
|
||||
future = self._get_event_loop().create_future()
|
||||
|
||||
audio_source = CustomAudioSource(self._out_sample_rate, 1)
|
||||
self._client.add_custom_audio_track(
|
||||
track_name=track_name,
|
||||
audio_source=audio_source,
|
||||
completion=completion_callback(future),
|
||||
)
|
||||
|
||||
await future
|
||||
|
||||
return audio_source
|
||||
|
||||
async def update_transcription(self, participants=None, instance_id=None):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_transcription(
|
||||
@@ -686,7 +755,15 @@ class DailyTransportClient(EventHandler):
|
||||
)
|
||||
await future
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
|
||||
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_publishing(
|
||||
publishing_settings=publishing_settings,
|
||||
completion=completion_callback(future),
|
||||
)
|
||||
await future
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
|
||||
future = self._get_event_loop().create_future()
|
||||
self._client.update_remote_participants(
|
||||
remote_participants=remote_participants, completion=completion_callback(future)
|
||||
@@ -773,15 +850,15 @@ class DailyTransportClient(EventHandler):
|
||||
# Daily (CallClient callbacks)
|
||||
#
|
||||
|
||||
def _video_frame_received(self, participant_id, video_frame):
|
||||
callback = self._video_renderers[participant_id]
|
||||
self._call_async_callback(
|
||||
callback,
|
||||
participant_id,
|
||||
video_frame.buffer,
|
||||
(video_frame.width, video_frame.height),
|
||||
video_frame.color_format,
|
||||
)
|
||||
def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
|
||||
callback = self._audio_renderers[participant_id][audio_source]
|
||||
self._call_async_callback(callback, participant_id, audio_data, audio_source)
|
||||
|
||||
def _video_frame_received(
|
||||
self, participant_id: str, video_frame: VideoFrame, video_source: str
|
||||
):
|
||||
callback = self._video_renderers[participant_id][video_source]
|
||||
self._call_async_callback(callback, participant_id, video_frame, video_source)
|
||||
|
||||
def _call_async_callback(self, callback, *args):
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
@@ -837,6 +914,8 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# internally to be processed.
|
||||
self._audio_in_task = None
|
||||
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
|
||||
|
||||
@property
|
||||
@@ -851,6 +930,9 @@ class DailyInputTransport(BaseInputTransport):
|
||||
self._audio_in_task = self.create_task(self._audio_in_task_handler())
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
|
||||
@@ -859,8 +941,6 @@ class DailyInputTransport(BaseInputTransport):
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
if self._params.audio_in_stream_on_start:
|
||||
@@ -916,6 +996,31 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Audio in
|
||||
#
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
audio_source: str = "camera",
|
||||
):
|
||||
await self._client.capture_participant_audio(
|
||||
participant_id, self._on_participant_audio_data, audio_source
|
||||
)
|
||||
|
||||
async def _on_participant_audio_data(
|
||||
self, participant_id: str, audio: AudioData, audio_source: str
|
||||
):
|
||||
resampled = await self._resampler.resample(
|
||||
audio.audio_frames, audio.sample_rate, self._client.out_sample_rate
|
||||
)
|
||||
|
||||
frame = UserAudioRawFrame(
|
||||
user_id=participant_id,
|
||||
audio=resampled,
|
||||
sample_rate=self._client.out_sample_rate,
|
||||
num_channels=audio.num_channels,
|
||||
)
|
||||
frame.transport_source = audio_source
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _audio_in_task_handler(self):
|
||||
while True:
|
||||
frame = await self._client.read_next_audio_frame()
|
||||
@@ -934,9 +1039,11 @@ class DailyInputTransport(BaseInputTransport):
|
||||
color_format: str = "RGB",
|
||||
):
|
||||
self._video_renderers[participant_id] = {
|
||||
"framerate": framerate,
|
||||
"timestamp": 0,
|
||||
"render_next_frame": [],
|
||||
video_source: {
|
||||
"framerate": framerate,
|
||||
"timestamp": 0,
|
||||
"render_next_frame": [],
|
||||
}
|
||||
}
|
||||
|
||||
await self._client.capture_participant_video(
|
||||
@@ -947,12 +1054,14 @@ class DailyInputTransport(BaseInputTransport):
|
||||
if frame.user_id in self._video_renderers:
|
||||
self._video_renderers[frame.user_id]["render_next_frame"].append(frame)
|
||||
|
||||
async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
|
||||
async def _on_participant_video_frame(
|
||||
self, participant_id: str, video_frame: VideoFrame, video_source: str
|
||||
):
|
||||
render_frame = False
|
||||
|
||||
curr_time = time.time()
|
||||
prev_time = self._video_renderers[participant_id]["timestamp"]
|
||||
framerate = self._video_renderers[participant_id]["framerate"]
|
||||
prev_time = self._video_renderers[participant_id][video_source]["timestamp"]
|
||||
framerate = self._video_renderers[participant_id][video_source]["framerate"]
|
||||
|
||||
# Some times we render frames because of a request.
|
||||
request_frame = None
|
||||
@@ -961,20 +1070,23 @@ class DailyInputTransport(BaseInputTransport):
|
||||
next_time = prev_time + 1 / framerate
|
||||
render_frame = (next_time - curr_time) < 0.1
|
||||
|
||||
elif self._video_renderers[participant_id]["render_next_frame"]:
|
||||
request_frame = self._video_renderers[participant_id]["render_next_frame"].pop(0)
|
||||
elif self._video_renderers[participant_id][video_source]["render_next_frame"]:
|
||||
request_frame = self._video_renderers[participant_id][video_source][
|
||||
"render_next_frame"
|
||||
].pop(0)
|
||||
render_frame = True
|
||||
|
||||
if render_frame:
|
||||
frame = UserImageRawFrame(
|
||||
user_id=participant_id,
|
||||
request=request_frame,
|
||||
image=buffer,
|
||||
size=size,
|
||||
format=format,
|
||||
image=video_frame.buffer,
|
||||
size=(video_frame.width, video_frame.height),
|
||||
format=video_frame.color_format,
|
||||
)
|
||||
frame.transport_source = video_source
|
||||
await self.push_frame(frame)
|
||||
self._video_renderers[participant_id]["timestamp"] = curr_time
|
||||
self._video_renderers[participant_id][video_source]["timestamp"] = curr_time
|
||||
|
||||
|
||||
class DailyOutputTransport(BaseOutputTransport):
|
||||
@@ -999,6 +1111,9 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
self._initialized = False
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
|
||||
@@ -1007,8 +1122,6 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# Setup client.
|
||||
await self._client.setup(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
|
||||
@@ -1032,11 +1145,19 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
async def register_video_destination(self, destination: str):
|
||||
logger.warning(f"{self} registering video destinations is not supported yet")
|
||||
|
||||
async def write_raw_video_frame(self, frame: OutputImageRawFrame):
|
||||
await self._client.write_raw_video_frame(frame)
|
||||
async def register_audio_destination(self, destination: str):
|
||||
await self._client.register_audio_destination(destination)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
await self._client.write_raw_audio_frames(frames, destination)
|
||||
|
||||
async def write_raw_video_frame(
|
||||
self, frame: OutputImageRawFrame, destination: Optional[str] = None
|
||||
):
|
||||
await self._client.write_raw_video_frame(frame, destination)
|
||||
|
||||
|
||||
class DailyTransport(BaseTransport):
|
||||
@@ -1204,6 +1325,14 @@ class DailyTransport(BaseTransport):
|
||||
async def capture_participant_transcription(self, participant_id: str):
|
||||
await self._client.capture_participant_transcription(participant_id)
|
||||
|
||||
async def capture_participant_audio(
|
||||
self,
|
||||
participant_id: str,
|
||||
audio_source: str = "microphone",
|
||||
):
|
||||
if self._input:
|
||||
await self._input.capture_participant_audio(participant_id, audio_source)
|
||||
|
||||
async def capture_participant_video(
|
||||
self,
|
||||
participant_id: str,
|
||||
@@ -1216,12 +1345,15 @@ class DailyTransport(BaseTransport):
|
||||
participant_id, framerate, video_source, color_format
|
||||
)
|
||||
|
||||
async def update_publishing(self, publishing_settings: Mapping[str, Any]):
|
||||
await self._client.update_publishing(publishing_settings=publishing_settings)
|
||||
|
||||
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
|
||||
await self._client.update_subscriptions(
|
||||
participant_settings=participant_settings, profile_settings=profile_settings
|
||||
)
|
||||
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
|
||||
async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
|
||||
await self._client.update_remote_participants(remote_participants=remote_participants)
|
||||
|
||||
async def _on_joined(self, data):
|
||||
|
||||
@@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
|
||||
else:
|
||||
await self._client.send_data(frame.message.encode())
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
|
||||
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
|
||||
await self._client.publish_audio(livekit_audio)
|
||||
|
||||
|
||||
@@ -287,3 +287,50 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
async def test_direct_frame_muting(self):
|
||||
"""Test that RequestSTTMuteFrame and RequestSTTUnmuteFrame directly control muting."""
|
||||
from pipecat.frames.frames import RequestSTTMuteFrame, RequestSTTUnmuteFrame
|
||||
|
||||
# Create filter with no strategies to isolate direct frame control
|
||||
filter = STTMuteFilter(config=STTMuteConfig(strategies=set()))
|
||||
|
||||
frames_to_send = [
|
||||
# Initially unmuted - frames should pass through
|
||||
UserStartedSpeakingFrame(),
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1),
|
||||
UserStoppedSpeakingFrame(),
|
||||
# Mute via frame - subsequent frames should be suppressed
|
||||
RequestSTTMuteFrame(),
|
||||
SleepFrame(sleep=0.1),
|
||||
UserStartedSpeakingFrame(), # Should be suppressed
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
# Unmute via frame - frames should pass through again
|
||||
RequestSTTUnmuteFrame(),
|
||||
SleepFrame(sleep=0.1),
|
||||
UserStartedSpeakingFrame(),
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1),
|
||||
UserStoppedSpeakingFrame(),
|
||||
]
|
||||
|
||||
expected_returned_frames = [
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
RequestSTTMuteFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
RequestSTTUnmuteFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
]
|
||||
|
||||
await run_test(
|
||||
filter,
|
||||
frames_to_send=frames_to_send,
|
||||
expected_down_frames=expected_returned_frames,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user