Compare commits
16 Commits
vs/deepgra
...
aleix/webs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a6136fa9d | ||
|
|
219af71227 | ||
|
|
d2117c6f32 | ||
|
|
533c09741c | ||
|
|
797f82c836 | ||
|
|
fe8f1c8360 | ||
|
|
e38ca3e455 | ||
|
|
4bab20643b | ||
|
|
8f7b3e1d5d | ||
|
|
4099256c18 | ||
|
|
f4ae56d7b6 | ||
|
|
9fd061e33e | ||
|
|
5d2f062afa | ||
|
|
d8b483dc1e | ||
|
|
02191b7d58 | ||
|
|
291c39efc4 |
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added WebsocketServerTransport. This will create a websocket server and will
|
||||
read messages coming from a client. The messages are serialized/deserialized
|
||||
with protobufs. See `examples/websocket-server` for a detailed example.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed SileroVAD frame processor.
|
||||
|
||||
2
LICENSE
2
LICENSE
@@ -1,6 +1,6 @@
|
||||
BSD 2-Clause License
|
||||
|
||||
Copyright (c) 2024, Kwindla Hultman Kramer
|
||||
Copyright (c) 2024, Daily
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
autopep8~=2.1.0
|
||||
build~=1.2.1
|
||||
grpcio-tools~=1.62.2
|
||||
pip-tools~=7.4.1
|
||||
pytest~=8.2.0
|
||||
setuptools~=69.5.1
|
||||
|
||||
@@ -44,7 +44,7 @@ async def main(room_url):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -93,7 +93,7 @@ async def main(room_url):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
imagegen = FalImageGenService(
|
||||
params=FalImageGenService.InputParams(
|
||||
|
||||
@@ -76,7 +76,7 @@ async def main():
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
|
||||
@@ -81,7 +81,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -53,7 +53,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -53,7 +53,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -95,7 +95,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
|
||||
@@ -71,7 +71,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
|
||||
39
examples/foundational/15-websocket-server.py
Normal file
39
examples/foundational/15-websocket-server.py
Normal file
@@ -0,0 +1,39 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.transports.network.websocket_server import WebsocketServerTransport
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
transport = WebsocketServerTransport()
|
||||
|
||||
pipeline = Pipeline([transport.input(), transport.output()])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,25 +0,0 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package pipecat_proto;
|
||||
|
||||
message TextFrame {
|
||||
string text = 1;
|
||||
}
|
||||
|
||||
message AudioFrame {
|
||||
bytes audio = 1;
|
||||
}
|
||||
|
||||
message TranscriptionFrame {
|
||||
string text = 1;
|
||||
string participant_id = 2;
|
||||
string timestamp = 3;
|
||||
}
|
||||
|
||||
message Frame {
|
||||
oneof frame {
|
||||
TextFrame text = 1;
|
||||
AudioFrame audio = 2;
|
||||
TranscriptionFrame transcription = 3;
|
||||
}
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<script src="//cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
|
||||
<title>WebSocket Audio Stream</title>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<h1>WebSocket Audio Stream</h1>
|
||||
<button id="startAudioBtn">Start Audio</button>
|
||||
<button id="stopAudioBtn">Stop Audio</button>
|
||||
<script>
|
||||
const SAMPLE_RATE = 16000;
|
||||
const BUFFER_SIZE = 8192;
|
||||
const MIN_AUDIO_SIZE = 6400;
|
||||
|
||||
let audioContext;
|
||||
let microphoneStream;
|
||||
let scriptProcessor;
|
||||
let source;
|
||||
let frame;
|
||||
let audioChunks = [];
|
||||
let isPlaying = false;
|
||||
let ws;
|
||||
|
||||
const proto = protobuf.load("frames.proto", (err, root) => {
|
||||
if (err) throw err;
|
||||
frame = root.lookupType("pipecat_proto.Frame");
|
||||
});
|
||||
|
||||
function initWebSocket() {
|
||||
ws = new WebSocket('ws://localhost:8765');
|
||||
|
||||
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
|
||||
ws.addEventListener('message', handleWebSocketMessage);
|
||||
ws.addEventListener('close', (event) => console.log("WebSocket connection closed.", event.code, event.reason));
|
||||
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
|
||||
}
|
||||
|
||||
async function handleWebSocketMessage(event) {
|
||||
const arrayBuffer = await event.data.arrayBuffer();
|
||||
enqueueAudioFromProto(arrayBuffer);
|
||||
}
|
||||
|
||||
function enqueueAudioFromProto(arrayBuffer) {
|
||||
const parsedFrame = frame.decode(new Uint8Array(arrayBuffer));
|
||||
if (!parsedFrame?.audio) return false;
|
||||
|
||||
const frameCount = parsedFrame.audio.data.length / 2;
|
||||
const audioOutBuffer = audioContext.createBuffer(1, frameCount, SAMPLE_RATE);
|
||||
const nowBuffering = audioOutBuffer.getChannelData(0);
|
||||
const view = new Int16Array(parsedFrame.audio.data.buffer);
|
||||
|
||||
for (let i = 0; i < frameCount; i++) {
|
||||
const word = view[i];
|
||||
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
|
||||
}
|
||||
|
||||
audioChunks.push(audioOutBuffer);
|
||||
if (!isPlaying) playNextChunk();
|
||||
}
|
||||
|
||||
function playNextChunk() {
|
||||
if (audioChunks.length === 0) {
|
||||
isPlaying = false;
|
||||
return;
|
||||
}
|
||||
|
||||
isPlaying = true;
|
||||
const audioOutBuffer = audioChunks.shift();
|
||||
const source = audioContext.createBufferSource();
|
||||
source.buffer = audioOutBuffer;
|
||||
source.connect(audioContext.destination);
|
||||
source.onended = playNextChunk;
|
||||
source.start();
|
||||
}
|
||||
|
||||
function startAudio() {
|
||||
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
|
||||
alert('getUserMedia is not supported in your browser.');
|
||||
return;
|
||||
}
|
||||
|
||||
navigator.mediaDevices.getUserMedia({ audio: true })
|
||||
.then((stream) => {
|
||||
microphoneStream = stream;
|
||||
audioContext = new (window.AudioContext || window.webkitAudioContext)();
|
||||
scriptProcessor = audioContext.createScriptProcessor(BUFFER_SIZE, 1, 1);
|
||||
source = audioContext.createMediaStreamSource(stream);
|
||||
source.connect(scriptProcessor);
|
||||
scriptProcessor.connect(audioContext.destination);
|
||||
|
||||
const audioBuffer = [];
|
||||
const skipRatio = Math.floor(audioContext.sampleRate / (SAMPLE_RATE * 2));
|
||||
|
||||
scriptProcessor.onaudioprocess = (event) => {
|
||||
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
|
||||
for (let i = 0; i < rawLeftChannelData.length; i += skipRatio) {
|
||||
const normalized = ((rawLeftChannelData[i] * 32768.0) + 32768) % 65536 - 32768;
|
||||
const swappedBytes = ((normalized & 0xff) << 8) | ((normalized >> 8) & 0xff);
|
||||
audioBuffer.push(swappedBytes);
|
||||
}
|
||||
|
||||
if (audioBuffer.length >= MIN_AUDIO_SIZE) {
|
||||
const audioFrame = frame.create({ audio: { audio: audioBuffer.slice(0, MIN_AUDIO_SIZE) } });
|
||||
const encodedFrame = new Uint8Array(frame.encode(audioFrame).finish());
|
||||
ws.send(encodedFrame);
|
||||
audioBuffer.splice(0, MIN_AUDIO_SIZE);
|
||||
}
|
||||
};
|
||||
|
||||
initWebSocket();
|
||||
})
|
||||
.catch((error) => console.error('Error accessing microphone:', error));
|
||||
}
|
||||
|
||||
function stopAudio() {
|
||||
if (ws) {
|
||||
ws.close();
|
||||
scriptProcessor.disconnect();
|
||||
source.disconnect();
|
||||
ws = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
document.getElementById('startAudioBtn').addEventListener('click', startAudio);
|
||||
document.getElementById('stopAudioBtn').addEventListener('click', stopAudio);
|
||||
</script>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
@@ -1,50 +0,0 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import os
|
||||
from pipecat.pipeline.frame_processor import FrameProcessor
|
||||
from pipecat.pipeline.frames import TextFrame, TranscriptionFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
|
||||
from pipecat.transports.websocket_transport import WebsocketTransport
|
||||
from pipecat.services.whisper_ai_services import WhisperSTTService
|
||||
|
||||
logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s")
|
||||
logger = logging.getLogger("pipecat")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class WhisperTranscriber(FrameProcessor):
|
||||
async def process_frame(self, frame):
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
print(f"Transcribed: {frame.text}")
|
||||
else:
|
||||
yield frame
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = WebsocketTransport(
|
||||
mic_enabled=True,
|
||||
speaker_enabled=True,
|
||||
)
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
pipeline = Pipeline([
|
||||
WhisperSTTService(),
|
||||
WhisperTranscriber(),
|
||||
tts,
|
||||
])
|
||||
|
||||
@transport.on_connection
|
||||
async def queue_frame():
|
||||
await pipeline.queue_frames([TextFrame("Hello there!")])
|
||||
|
||||
await transport.run(pipeline)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -145,7 +145,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
ta = TalkingAnimation()
|
||||
|
||||
|
||||
@@ -117,7 +117,7 @@ async def main(room_url: str, token):
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo-preview")
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
|
||||
@@ -56,7 +56,7 @@ async def main(room_url, token=None):
|
||||
|
||||
llm_service = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4-turbo"
|
||||
model="gpt-4o"
|
||||
)
|
||||
|
||||
tts_service = ElevenLabsTTSService(
|
||||
|
||||
@@ -97,7 +97,8 @@ async def main(room_url: str, token):
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview"
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o"
|
||||
)
|
||||
|
||||
sa = SentenceAggregator()
|
||||
|
||||
27
examples/websocket-server/README.md
Normal file
27
examples/websocket-server/README.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# Websocket Server
|
||||
|
||||
This is an example that shows how to use `WebsocketServerTransport` to communicate with a web client.
|
||||
|
||||
## Get started
|
||||
|
||||
```python
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Run the server
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
## Run the HTTP server
|
||||
|
||||
This will host the static web client:
|
||||
|
||||
```bash
|
||||
python -m http.server
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:8000` in your browser to start a session.
|
||||
43
examples/websocket-server/frames.proto
Normal file
43
examples/websocket-server/frames.proto
Normal file
@@ -0,0 +1,43 @@
|
||||
//
|
||||
// Copyright (c) 2024, Daily
|
||||
//
|
||||
// SPDX-License-Identifier: BSD 2-Clause License
|
||||
//
|
||||
|
||||
// Generate frames_pb2.py with:
|
||||
//
|
||||
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package pipecat;
|
||||
|
||||
message TextFrame {
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
string text = 3;
|
||||
}
|
||||
|
||||
message AudioRawFrame {
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
bytes audio = 3;
|
||||
uint32 sample_rate = 4;
|
||||
uint32 num_channels = 5;
|
||||
}
|
||||
|
||||
message TranscriptionFrame {
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
string text = 3;
|
||||
string user_id = 4;
|
||||
string timestamp = 5;
|
||||
}
|
||||
|
||||
message Frame {
|
||||
oneof frame {
|
||||
TextFrame text = 1;
|
||||
AudioRawFrame audio = 2;
|
||||
TranscriptionFrame transcription = 3;
|
||||
}
|
||||
}
|
||||
204
examples/websocket-server/index.html
Normal file
204
examples/websocket-server/index.html
Normal file
@@ -0,0 +1,204 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<script src="https://cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
|
||||
<title>Pipecat WebSocket Client Example</title>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<h1>Pipecat WebSocket Client Example</h1>
|
||||
<h3><div id="progressText">Loading, wait...</div></h2>
|
||||
<button id="startAudioBtn">Start Audio</button>
|
||||
<button id="stopAudioBtn">Stop Audio</button>
|
||||
<script>
|
||||
const SAMPLE_RATE = 16000;
|
||||
const NUM_CHANNELS = 1;
|
||||
const PLAY_TIME_RESET_THRESHOLD_MS = 1.0;
|
||||
|
||||
// The protobuf type. We will load it later.
|
||||
let Frame = null;
|
||||
|
||||
// The websocket connection.
|
||||
let ws = null;
|
||||
|
||||
// The audio context
|
||||
let audioContext = null;
|
||||
|
||||
// The audio context media stream source
|
||||
let source = null;
|
||||
|
||||
// The microphone stream from getUserMedia. SHould be sampled to the
|
||||
// proper sample rate.
|
||||
let microphoneStream = null;
|
||||
|
||||
// Script processor to get data from microphone.
|
||||
let scriptProcessor = null;
|
||||
|
||||
// AudioContext play time.
|
||||
let playTime = 0;
|
||||
|
||||
// Last time we received a websocket message.
|
||||
let lastMessageTime = 0;
|
||||
|
||||
// Whether we should be playing audio.
|
||||
let isPlaying = false;
|
||||
|
||||
let startBtn = document.getElementById('startAudioBtn');
|
||||
let stopBtn = document.getElementById('stopAudioBtn');
|
||||
|
||||
const proto = protobuf.load("frames.proto", (err, root) => {
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
Frame = root.lookupType("pipecat.Frame");
|
||||
const progressText = document.getElementById("progressText");
|
||||
progressText.textContent = "We are ready! Make sure to run the server and then click `Start Audio`.";
|
||||
|
||||
startBtn.disabled = false;
|
||||
stopBtn.disabled = true;
|
||||
});
|
||||
|
||||
function initWebSocket() {
|
||||
ws = new WebSocket('ws://localhost:8765');
|
||||
|
||||
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
|
||||
ws.addEventListener('message', handleWebSocketMessage);
|
||||
ws.addEventListener('close', (event) => {
|
||||
console.log("WebSocket connection closed.", event.code, event.reason);
|
||||
stopAudio(false);
|
||||
});
|
||||
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
|
||||
}
|
||||
|
||||
async function handleWebSocketMessage(event) {
|
||||
const arrayBuffer = await event.data.arrayBuffer();
|
||||
if (isPlaying) {
|
||||
enqueueAudioFromProto(arrayBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
function enqueueAudioFromProto(arrayBuffer) {
|
||||
const parsedFrame = Frame.decode(new Uint8Array(arrayBuffer));
|
||||
if (!parsedFrame?.audio) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Reset play time if it's been a while we haven't played anything.
|
||||
const diffTime = audioContext.currentTime - lastMessageTime;
|
||||
if ((playTime == 0) || (diffTime > PLAY_TIME_RESET_THRESHOLD_MS)) {
|
||||
playTime = audioContext.currentTime;
|
||||
}
|
||||
lastMessageTime = audioContext.currentTime;
|
||||
|
||||
// We should be able to use parsedFrame.audio.audio.buffer but for
|
||||
// some reason that contains all the bytes from the protobuf message.
|
||||
const audioVector = Array.from(parsedFrame.audio.audio);
|
||||
const audioArray = new Uint8Array(audioVector);
|
||||
|
||||
audioContext.decodeAudioData(audioArray.buffer, function(buffer) {
|
||||
const source = new AudioBufferSourceNode(audioContext);
|
||||
source.buffer = buffer;
|
||||
source.start(playTime);
|
||||
source.connect(audioContext.destination);
|
||||
playTime = playTime + buffer.duration;
|
||||
});
|
||||
}
|
||||
|
||||
function convertFloat32ToS16PCM(float32Array) {
|
||||
let int16Array = new Int16Array(float32Array.length);
|
||||
|
||||
for (let i = 0; i < float32Array.length; i++) {
|
||||
let clampedValue = Math.max(-1, Math.min(1, float32Array[i]));
|
||||
int16Array[i] = clampedValue < 0 ? clampedValue * 32768 : clampedValue * 32767;
|
||||
}
|
||||
return int16Array;
|
||||
}
|
||||
|
||||
function startAudioBtnHandler() {
|
||||
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
|
||||
alert('getUserMedia is not supported in your browser.');
|
||||
return;
|
||||
}
|
||||
|
||||
startBtn.disabled = true;
|
||||
stopBtn.disabled = false;
|
||||
|
||||
audioContext = new (window.AudioContext || window.webkitAudioContext)({
|
||||
latencyHint: "interactive",
|
||||
sampleRate: SAMPLE_RATE
|
||||
});
|
||||
|
||||
isPlaying = true;
|
||||
|
||||
initWebSocket();
|
||||
|
||||
navigator.mediaDevices.getUserMedia({
|
||||
audio: {
|
||||
sampleRate: SAMPLE_RATE,
|
||||
channelCount: NUM_CHANNELS,
|
||||
autoGainControl: true,
|
||||
echoCancellation: true,
|
||||
noiseSuppression: true,
|
||||
}
|
||||
}).then((stream) => {
|
||||
microphoneStream = stream;
|
||||
// 512 is closest thing to 200ms.
|
||||
scriptProcessor = audioContext.createScriptProcessor(512, 1, 1);
|
||||
source = audioContext.createMediaStreamSource(stream);
|
||||
source.connect(scriptProcessor);
|
||||
scriptProcessor.connect(audioContext.destination);
|
||||
|
||||
scriptProcessor.onaudioprocess = (event) => {
|
||||
if (!ws) {
|
||||
return;
|
||||
}
|
||||
|
||||
const audioData = event.inputBuffer.getChannelData(0);
|
||||
const pcmS16Array = convertFloat32ToS16PCM(audioData);
|
||||
const pcmByteArray = new Uint8Array(pcmS16Array.buffer);
|
||||
const frame = Frame.create({
|
||||
audio: {
|
||||
audio: Array.from(pcmByteArray),
|
||||
sampleRate: SAMPLE_RATE,
|
||||
numChannels: NUM_CHANNELS
|
||||
}
|
||||
});
|
||||
const encodedFrame = new Uint8Array(Frame.encode(frame).finish());
|
||||
ws.send(encodedFrame);
|
||||
};
|
||||
}).catch((error) => console.error('Error accessing microphone:', error));
|
||||
}
|
||||
|
||||
function stopAudio(closeWebsocket) {
|
||||
isPlaying = false;
|
||||
startBtn.disabled = false;
|
||||
stopBtn.disabled = true;
|
||||
|
||||
if (ws && closeWebsocket) {
|
||||
ws.close();
|
||||
ws = null;
|
||||
}
|
||||
|
||||
if (scriptProcessor) {
|
||||
scriptProcessor.disconnect();
|
||||
}
|
||||
if (source) {
|
||||
source.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
function stopAudioBtnHandler() {
|
||||
stopAudio(true);
|
||||
}
|
||||
|
||||
startBtn.addEventListener('click', startAudioBtnHandler);
|
||||
stopBtn.addEventListener('click', stopAudioBtnHandler);
|
||||
startBtn.disabled = true;
|
||||
stopBtn.disabled = true;
|
||||
</script>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
2
examples/websocket-server/requirements.txt
Normal file
2
examples/websocket-server/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
python-dotenv
|
||||
pipecat-ai[openai,silero,websocket,whisper]
|
||||
90
examples/websocket-server/server.py
Normal file
90
examples/websocket-server/server.py
Normal file
@@ -0,0 +1,90 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.whisper import WhisperSTTService
|
||||
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
|
||||
from pipecat.vad.silero import SileroVAD
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = WebsocketServerTransport(
|
||||
params=WebsocketServerParams(
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=True
|
||||
)
|
||||
)
|
||||
|
||||
vad = SileroVAD(audio_passthrough=True)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
stt = WhisperSTTService()
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Websocket input from client
|
||||
vad, # VAD to detect user speech
|
||||
stt, # Speech-To-Text
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to client
|
||||
tma_out # LLM responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -42,7 +42,7 @@ coloredlogs==15.0.1
|
||||
# via onnxruntime
|
||||
ctranslate2==4.2.1
|
||||
# via faster-whisper
|
||||
daily-python==0.9.0
|
||||
daily-python==0.9.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
distro==1.9.0
|
||||
# via
|
||||
@@ -226,6 +226,7 @@ protobuf==4.25.3
|
||||
# googleapis-common-protos
|
||||
# grpcio-status
|
||||
# onnxruntime
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# proto-plus
|
||||
# pyht
|
||||
pyasn1==0.6.0
|
||||
@@ -259,7 +260,7 @@ pyyaml==6.0.1
|
||||
# transformers
|
||||
regex==2024.5.15
|
||||
# via transformers
|
||||
requests==2.32.2
|
||||
requests==2.32.3
|
||||
# via
|
||||
# google-api-core
|
||||
# huggingface-hub
|
||||
|
||||
@@ -208,6 +208,7 @@ protobuf==4.25.3
|
||||
# googleapis-common-protos
|
||||
# grpcio-status
|
||||
# onnxruntime
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# proto-plus
|
||||
# pyht
|
||||
pyasn1==0.6.0
|
||||
|
||||
@@ -24,6 +24,7 @@ dependencies = [
|
||||
"numpy~=1.26.4",
|
||||
"loguru~=0.7.0",
|
||||
"Pillow~=10.3.0",
|
||||
"protobuf~=4.25.3",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"typing-extensions~=4.11.0",
|
||||
]
|
||||
|
||||
@@ -4,28 +4,40 @@
|
||||
// SPDX-License-Identifier: BSD 2-Clause License
|
||||
//
|
||||
|
||||
// Generate frames_pb2.py with:
|
||||
//
|
||||
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package pipecat_proto;
|
||||
package pipecat;
|
||||
|
||||
message TextFrame {
|
||||
string text = 1;
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
string text = 3;
|
||||
}
|
||||
|
||||
message AudioFrame {
|
||||
bytes data = 1;
|
||||
message AudioRawFrame {
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
bytes audio = 3;
|
||||
uint32 sample_rate = 4;
|
||||
uint32 num_channels = 5;
|
||||
}
|
||||
|
||||
message TranscriptionFrame {
|
||||
string text = 1;
|
||||
string participantId = 2;
|
||||
string timestamp = 3;
|
||||
uint64 id = 1;
|
||||
string name = 2;
|
||||
string text = 3;
|
||||
string user_id = 4;
|
||||
string timestamp = 5;
|
||||
}
|
||||
|
||||
message Frame {
|
||||
oneof frame {
|
||||
TextFrame text = 1;
|
||||
AudioFrame audio = 2;
|
||||
TranscriptionFrame transcription = 3;
|
||||
}
|
||||
oneof frame {
|
||||
TextFrame text = 1;
|
||||
AudioRawFrame audio = 2;
|
||||
TranscriptionFrame transcription = 3;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: frames.proto
|
||||
# Protobuf Python Version: 4.25.3
|
||||
# Protobuf Python Version: 4.25.1
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
@@ -14,19 +14,19 @@ _sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\rpipecat_proto\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1a\n\nAudioFrame\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"L\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x15\n\rparticipantId\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\xa2\x01\n\x05\x46rame\x12(\n\x04text\x18\x01 \x01(\x0b\x32\x18.pipecat_proto.TextFrameH\x00\x12*\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x19.pipecat_proto.AudioFrameH\x00\x12:\n\rtranscription\x18\x03 \x01(\x0b\x32!.pipecat_proto.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'frames_pb2', _globals)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
DESCRIPTOR._options = None
|
||||
_globals['_TEXTFRAME']._serialized_start=31
|
||||
_globals['_TEXTFRAME']._serialized_end=56
|
||||
_globals['_AUDIOFRAME']._serialized_start=58
|
||||
_globals['_AUDIOFRAME']._serialized_end=84
|
||||
_globals['_TRANSCRIPTIONFRAME']._serialized_start=86
|
||||
_globals['_TRANSCRIPTIONFRAME']._serialized_end=162
|
||||
_globals['_FRAME']._serialized_start=165
|
||||
_globals['_FRAME']._serialized_end=327
|
||||
_globals['_TEXTFRAME']._serialized_start=25
|
||||
_globals['_TEXTFRAME']._serialized_end=76
|
||||
_globals['_AUDIORAWFRAME']._serialized_start=78
|
||||
_globals['_AUDIORAWFRAME']._serialized_end=177
|
||||
_globals['_TRANSCRIPTIONFRAME']._serialized_start=179
|
||||
_globals['_TRANSCRIPTIONFRAME']._serialized_end=275
|
||||
_globals['_FRAME']._serialized_start=278
|
||||
_globals['_FRAME']._serialized_end=425
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import Callable, Coroutine, List
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
@@ -67,7 +65,8 @@ class Pipeline(FrameProcessor):
|
||||
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
|
||||
|
||||
async def _cleanup_processors(self):
|
||||
await asyncio.gather(*[p.cleanup() for p in self._processors])
|
||||
for p in self._processors:
|
||||
await p.cleanup()
|
||||
|
||||
def _link_processors(self):
|
||||
prev = self._processors[0]
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from asyncio import AbstractEventLoop
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame
|
||||
@@ -21,12 +21,12 @@ class FrameDirection(Enum):
|
||||
|
||||
class FrameProcessor:
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop | None = None):
|
||||
self.id: int = obj_id()
|
||||
self.name = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._prev: "FrameProcessor" | None = None
|
||||
self._next: "FrameProcessor" | None = None
|
||||
self._loop: AbstractEventLoop = asyncio.get_running_loop()
|
||||
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
||||
|
||||
async def cleanup(self):
|
||||
pass
|
||||
@@ -36,7 +36,7 @@ class FrameProcessor:
|
||||
processor._prev = self
|
||||
logger.debug(f"Linking {self} -> {self._next}")
|
||||
|
||||
def get_event_loop(self) -> AbstractEventLoop:
|
||||
def get_event_loop(self) -> asyncio.AbstractEventLoop:
|
||||
return self._loop
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
from abc import abstractmethod
|
||||
|
||||
from pipecat.pipeline.frames import Frame
|
||||
|
||||
|
||||
class FrameSerializer:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self, frame: Frame) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data: bytes) -> Frame:
|
||||
raise NotImplementedError
|
||||
20
src/pipecat/serializers/base_serializer.py
Normal file
20
src/pipecat/serializers/base_serializer.py
Normal file
@@ -0,0 +1,20 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pipecat.frames.frames import Frame
|
||||
|
||||
|
||||
class FrameSerializer(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self, frame: Frame) -> bytes:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data: bytes) -> Frame:
|
||||
pass
|
||||
@@ -1,14 +1,21 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import dataclasses
|
||||
from typing import Text
|
||||
from pipecat.pipeline.frames import AudioFrame, Frame, TextFrame, TranscriptionFrame
|
||||
import pipecat.pipeline.protobufs.frames_pb2 as frame_protos
|
||||
from pipecat.serializers.abstract_frame_serializer import FrameSerializer
|
||||
|
||||
import pipecat.frames.protobufs.frames_pb2 as frame_protos
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
|
||||
|
||||
class ProtobufFrameSerializer(FrameSerializer):
|
||||
SERIALIZABLE_TYPES = {
|
||||
TextFrame: "text",
|
||||
AudioFrame: "audio",
|
||||
AudioRawFrame: "audio",
|
||||
TranscriptionFrame: "transcription"
|
||||
}
|
||||
|
||||
@@ -29,7 +36,8 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
setattr(getattr(proto_frame, proto_optional_name), field.name,
|
||||
getattr(frame, field.name))
|
||||
|
||||
return proto_frame.SerializeToString()
|
||||
result = proto_frame.SerializeToString()
|
||||
return result
|
||||
|
||||
def deserialize(self, data: bytes) -> Frame:
|
||||
"""Returns a Frame object from a Frame protobuf. Used to convert frames
|
||||
@@ -61,4 +69,22 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
args_dict = {}
|
||||
for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields:
|
||||
args_dict[field.name] = getattr(args, field.name)
|
||||
return class_name(**args_dict)
|
||||
|
||||
# Remove special fields if needed
|
||||
id = getattr(args, "id")
|
||||
name = getattr(args, "name")
|
||||
if not id:
|
||||
del args_dict["id"]
|
||||
if not name:
|
||||
del args_dict["name"]
|
||||
|
||||
# Create the instance
|
||||
instance = class_name(**args_dict)
|
||||
|
||||
# Set special fields
|
||||
if id:
|
||||
setattr(instance, "id", getattr(args, "id"))
|
||||
if name:
|
||||
setattr(instance, "name", getattr(args, "name"))
|
||||
|
||||
return instance
|
||||
@@ -171,7 +171,7 @@ class ImageGenService(AIService):
|
||||
super().__init__()
|
||||
|
||||
# Renders the image. Returns an Image object.
|
||||
@ abstractmethod
|
||||
@abstractmethod
|
||||
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
|
||||
pass
|
||||
|
||||
@@ -190,7 +190,7 @@ class VisionService(AIService):
|
||||
super().__init__()
|
||||
self._describe_text = None
|
||||
|
||||
@ abstractmethod
|
||||
@abstractmethod
|
||||
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
|
||||
pass
|
||||
|
||||
|
||||
@@ -249,7 +249,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
|
||||
class OpenAILLMService(BaseOpenAILLMService):
|
||||
|
||||
def __init__(self, model="gpt-4", **kwargs):
|
||||
def __init__(self, model="gpt-4o", **kwargs):
|
||||
super().__init__(model, **kwargs)
|
||||
|
||||
|
||||
|
||||
@@ -5,9 +5,6 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import queue
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
@@ -33,14 +30,11 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
self._running = False
|
||||
self._allow_interruptions = False
|
||||
|
||||
self._in_executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
# Create audio input queue if needed.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_queue = queue.Queue()
|
||||
self._audio_in_queue = asyncio.Queue()
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
@@ -52,45 +46,27 @@ class BaseInputTransport(FrameProcessor):
|
||||
# for example.
|
||||
self._allow_interruptions = frame.allow_interruptions
|
||||
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
loop = self.get_event_loop()
|
||||
self._audio_in_thread = loop.run_in_executor(
|
||||
self._in_executor, self._audio_in_thread_handler)
|
||||
self._audio_out_thread = loop.run_in_executor(
|
||||
self._in_executor, self._audio_out_thread_handler)
|
||||
self._audio_task = loop.create_task(self._audio_task_handler())
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
# This will exit all threads.
|
||||
self._running = False
|
||||
|
||||
# Wait for the threads to finish.
|
||||
# Wait for the tasks to finish.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
await self._audio_in_thread
|
||||
await self._audio_out_thread
|
||||
self._audio_task.cancel()
|
||||
|
||||
self._push_frame_task.cancel()
|
||||
|
||||
def vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
async def vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
pass
|
||||
|
||||
def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
pass
|
||||
|
||||
#
|
||||
# Frame processor
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, CancelFrame):
|
||||
await self.stop()
|
||||
@@ -150,8 +126,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Audio input
|
||||
#
|
||||
|
||||
def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
|
||||
new_vad_state = self.vad_analyze(audio_frames)
|
||||
async def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
|
||||
new_vad_state = await self.vad_analyze(audio_frames)
|
||||
if new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
|
||||
frame = None
|
||||
if new_vad_state == VADState.SPEAKING:
|
||||
@@ -160,51 +136,39 @@ class BaseInputTransport(FrameProcessor):
|
||||
frame = UserStoppedSpeakingFrame()
|
||||
|
||||
if frame:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_interruptions(frame), self.get_event_loop())
|
||||
future.result()
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
vad_state = new_vad_state
|
||||
return vad_state
|
||||
|
||||
def _audio_in_thread_handler(self):
|
||||
async def _audio_task_handler(self):
|
||||
vad_state: VADState = VADState.QUIET
|
||||
|
||||
sample_rate = self._params.audio_in_sample_rate
|
||||
num_channels = self._params.audio_in_channels
|
||||
num_frames = int(sample_rate / 100) # 10ms of audio
|
||||
while self._running:
|
||||
|
||||
while True:
|
||||
try:
|
||||
audio_frames = self.read_raw_audio_frames(num_frames)
|
||||
audio_frames = await self.read_raw_audio_frames(num_frames)
|
||||
if len(audio_frames) > 0:
|
||||
frame = AudioRawFrame(
|
||||
audio=audio_frames,
|
||||
sample_rate=sample_rate,
|
||||
num_channels=num_channels)
|
||||
self._audio_in_queue.put(frame)
|
||||
|
||||
audio_passthrough = True
|
||||
|
||||
# Check VAD and push event if necessary. We just care about
|
||||
# changes from QUIET to SPEAKING and vice versa.
|
||||
if self._params.vad_enabled:
|
||||
vad_state = await self._handle_vad(frame.audio, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
await self._internal_push_frame(frame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"Error reading audio frames: {e}")
|
||||
|
||||
def _audio_out_thread_handler(self):
|
||||
vad_state: VADState = VADState.QUIET
|
||||
while self._running:
|
||||
try:
|
||||
frame = self._audio_in_queue.get(timeout=1)
|
||||
|
||||
audio_passthrough = True
|
||||
|
||||
# Check VAD and push event if necessary. We just care about changes
|
||||
# from QUIET to SPEAKING and vice versa.
|
||||
if self._params.vad_enabled:
|
||||
vad_state = self._handle_vad(frame.audio, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
|
||||
self._audio_in_queue.task_done()
|
||||
except queue.Empty:
|
||||
pass
|
||||
except BaseException as e:
|
||||
logger.error(f"Error pushing audio frames: {e}")
|
||||
|
||||
@@ -1,17 +1,11 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import itertools
|
||||
import queue
|
||||
import time
|
||||
import threading
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from PIL import Image
|
||||
from typing import List
|
||||
@@ -40,22 +34,19 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
self._running = False
|
||||
self._allow_interruptions = False
|
||||
|
||||
self._out_executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
# These are the images that we should send to the camera at our desired
|
||||
# framerate.
|
||||
self._camera_images = None
|
||||
|
||||
# Create media threads queues.
|
||||
# Create media task queues.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = queue.Queue()
|
||||
self._sink_queue = queue.Queue()
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._sink_queue = asyncio.Queue()
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
self._is_interrupted = threading.Event()
|
||||
self._is_interrupted = asyncio.Event()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
# Make sure we have the latest params. Note that this transport might
|
||||
@@ -63,52 +54,40 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# for example.
|
||||
self._allow_interruptions = frame.allow_interruptions
|
||||
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
loop = self.get_event_loop()
|
||||
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_thread = loop.run_in_executor(
|
||||
self._out_executor, self._camera_out_thread_handler)
|
||||
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
|
||||
|
||||
self._sink_thread = loop.run_in_executor(self._out_executor, self._sink_thread_handler)
|
||||
self._sink_task = loop.create_task(self._sink_task_handler())
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
self._create_push_task()
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
# This will exit all threads.
|
||||
self._running = False
|
||||
|
||||
self._stopped_event.set()
|
||||
|
||||
def send_message(self, frame: TransportMessageFrame):
|
||||
async def cleanup(self):
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
|
||||
self._sink_task.cancel()
|
||||
self._push_frame_task.cancel()
|
||||
|
||||
async def send_message(self, frame: TransportMessageFrame):
|
||||
pass
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
pass
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
pass
|
||||
|
||||
#
|
||||
# Frame processor
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
# Wait on the threads to finish.
|
||||
if self._params.camera_out_enabled:
|
||||
await self._camera_out_thread
|
||||
|
||||
await self._sink_thread
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
#
|
||||
# Out-of-band frames like (CancelFrame or StartInterruptionFrame) are
|
||||
@@ -117,7 +96,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
#
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.start(frame)
|
||||
self._sink_queue.put(frame)
|
||||
await self._sink_queue.put(frame)
|
||||
# EndFrame is managed in the queue handler.
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self.stop()
|
||||
@@ -126,11 +105,11 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._handle_interruptions(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
self._sink_queue.put(frame)
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# If we are finishing, wait here until we have stopped, otherwise we might
|
||||
# close things too early upstream. We need this event because we don't
|
||||
# know when the internal threads will finish.
|
||||
# know when the internal tasks will finish.
|
||||
if isinstance(frame, CancelFrame) or isinstance(frame, EndFrame):
|
||||
await self._stopped_event.wait()
|
||||
|
||||
@@ -145,7 +124,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
elif isinstance(frame, StopInterruptionFrame):
|
||||
self._is_interrupted.clear()
|
||||
|
||||
def _sink_thread_handler(self):
|
||||
async def _sink_task_handler(self):
|
||||
# 10ms bytes
|
||||
bytes_size_10ms = int(self._params.audio_out_sample_rate / 100) * \
|
||||
self._params.audio_out_channels * 2
|
||||
@@ -155,37 +134,34 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
# Audio accumlation buffer
|
||||
buffer = bytearray()
|
||||
while self._running:
|
||||
while True:
|
||||
try:
|
||||
frame = self._sink_queue.get(timeout=1)
|
||||
frame = await self._sink_queue.get()
|
||||
if not self._is_interrupted.is_set():
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
if self._params.audio_out_enabled:
|
||||
buffer.extend(frame.audio)
|
||||
buffer = self._send_audio_truncated(buffer, smallest_write_size)
|
||||
buffer = await self._send_audio_truncated(buffer, smallest_write_size)
|
||||
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
|
||||
self._set_camera_image(frame)
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
|
||||
self._set_camera_images(frame.images)
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
self.send_message(frame)
|
||||
await self.send_message(frame)
|
||||
else:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
await self._internal_push_frame(frame)
|
||||
else:
|
||||
# If we get interrupted just clear the output buffer.
|
||||
buffer = bytearray()
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
# Send all remaining audio before stopping (multiple of 10ms of audio).
|
||||
self._send_audio_truncated(buffer, bytes_size_10ms)
|
||||
future = asyncio.run_coroutine_threadsafe(self.stop(), self.get_event_loop())
|
||||
future.result()
|
||||
await self._send_audio_truncated(buffer, bytes_size_10ms)
|
||||
await self.stop()
|
||||
|
||||
self._sink_queue.task_done()
|
||||
except queue.Empty:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"Error processing sink queue: {e}")
|
||||
|
||||
@@ -219,7 +195,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def send_image(self, frame: ImageRawFrame | SpriteFrame):
|
||||
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
def _draw_image(self, frame: ImageRawFrame):
|
||||
async def _draw_image(self, frame: ImageRawFrame):
|
||||
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
|
||||
|
||||
if frame.size != desired_size:
|
||||
@@ -229,32 +205,32 @@ class BaseOutputTransport(FrameProcessor):
|
||||
f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format)
|
||||
|
||||
self.write_frame_to_camera(frame)
|
||||
await self.write_frame_to_camera(frame)
|
||||
|
||||
def _set_camera_image(self, image: ImageRawFrame):
|
||||
async def _set_camera_image(self, image: ImageRawFrame):
|
||||
if self._params.camera_out_is_live:
|
||||
self._camera_out_queue.put(image)
|
||||
await self._camera_out_queue.put(image)
|
||||
else:
|
||||
self._camera_images = itertools.cycle([image])
|
||||
|
||||
def _set_camera_images(self, images: List[ImageRawFrame]):
|
||||
async def _set_camera_images(self, images: List[ImageRawFrame]):
|
||||
self._camera_images = itertools.cycle(images)
|
||||
|
||||
def _camera_out_thread_handler(self):
|
||||
while self._running:
|
||||
async def _camera_out_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
if self._params.camera_out_is_live:
|
||||
image = self._camera_out_queue.get(timeout=1)
|
||||
self._draw_image(image)
|
||||
image = await self._camera_out_queue.get()
|
||||
await self._draw_image(image)
|
||||
self._camera_out_queue.task_done()
|
||||
elif self._camera_images:
|
||||
image = next(self._camera_images)
|
||||
self._draw_image(image)
|
||||
time.sleep(1.0 / self._params.camera_out_framerate)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
||||
else:
|
||||
time.sleep(1.0 / self._params.camera_out_framerate)
|
||||
except queue.Empty:
|
||||
pass
|
||||
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing to camera: {e}")
|
||||
|
||||
@@ -265,13 +241,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def send_audio(self, frame: AudioRawFrame):
|
||||
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
def _send_audio_truncated(self, buffer: bytearray, smallest_write_size: int) -> bytearray:
|
||||
try:
|
||||
truncated_length: int = len(buffer) - (len(buffer) % smallest_write_size)
|
||||
if truncated_length:
|
||||
self.write_raw_audio_frames(bytes(buffer[:truncated_length]))
|
||||
buffer = buffer[truncated_length:]
|
||||
return buffer
|
||||
except BaseException as e:
|
||||
logger.error(f"Error writing audio frames: {e}")
|
||||
return buffer
|
||||
async def _send_audio_truncated(self, buffer: bytearray, smallest_write_size: int) -> bytearray:
|
||||
truncated_length: int = len(buffer) - (len(buffer) % smallest_write_size)
|
||||
if truncated_length:
|
||||
await self.write_raw_audio_frames(bytes(buffer[:truncated_length]))
|
||||
buffer = buffer[truncated_length:]
|
||||
return buffer
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pydantic import ConfigDict
|
||||
@@ -12,6 +15,8 @@ from pydantic.main import BaseModel
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.vad.vad_analyzer import VADAnalyzer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class TransportParams(BaseModel):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
@@ -36,6 +41,10 @@ class TransportParams(BaseModel):
|
||||
|
||||
class BaseTransport(ABC):
|
||||
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop | None):
|
||||
self._loop = loop or asyncio.get_running_loop()
|
||||
self._event_handlers: dict = {}
|
||||
|
||||
@abstractmethod
|
||||
def input(self) -> FrameProcessor:
|
||||
raise NotImplementedError
|
||||
@@ -43,3 +52,30 @@ class BaseTransport(ABC):
|
||||
@abstractmethod
|
||||
def output(self) -> FrameProcessor:
|
||||
raise NotImplementedError
|
||||
|
||||
def event_handler(self, event_name: str):
|
||||
def decorator(handler):
|
||||
self._add_event_handler(event_name, handler)
|
||||
return handler
|
||||
return decorator
|
||||
|
||||
def _register_event_handler(self, event_name: str):
|
||||
if event_name in self._event_handlers:
|
||||
raise Exception(f"Event handler {event_name} already registered")
|
||||
self._event_handlers[event_name] = []
|
||||
|
||||
def _add_event_handler(self, event_name: str, handler):
|
||||
if event_name not in self._event_handlers:
|
||||
raise Exception(f"Event handler {event_name} not registered")
|
||||
self._event_handlers[event_name].append(handler)
|
||||
|
||||
async def _call_event_handler(self, event_name: str, *args, **kwargs):
|
||||
try:
|
||||
for handler in self._event_handlers[event_name]:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
await handler(self, *args, **kwargs)
|
||||
else:
|
||||
handler(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception in event handler {event_name}: {e}")
|
||||
raise e
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
|
||||
import asyncio
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import numpy as np
|
||||
import tkinter as tk
|
||||
|
||||
@@ -38,6 +40,8 @@ class TkInputTransport(BaseInputTransport):
|
||||
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
self._in_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_in_channels,
|
||||
@@ -45,8 +49,11 @@ class TkInputTransport(BaseInputTransport):
|
||||
frames_per_buffer=params.audio_in_sample_rate,
|
||||
input=True)
|
||||
|
||||
def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
return self._in_stream.read(frame_count, exception_on_overflow=False)
|
||||
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
def read_audio(frame_count: int) -> bytes:
|
||||
return self._in_stream.read(frame_count, exception_on_overflow=False)
|
||||
|
||||
return await self.get_event_loop().run_in_executor(self._executor, read_audio, frame_count)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
@@ -70,6 +77,8 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
self._out_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_out_channels,
|
||||
@@ -83,10 +92,13 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
self._image_label = tk.Label(tk_root, image=photo)
|
||||
self._image_label.pack()
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._out_stream.write(frames)
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
def write_audio(frames: bytes):
|
||||
self._out_stream.write(frames)
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
await self.get_event_loop().run_in_executor(self._executor, write_audio, frames)
|
||||
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
|
||||
221
src/pipecat/transports/network/websocket_server.py
Normal file
221
src/pipecat/transports/network/websocket_server.py
Normal file
@@ -0,0 +1,221 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import wave
|
||||
import websockets
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
StartFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.serializers.protobuf import ProtobufFrameSerializer
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class WebsocketServerParams(TransportParams):
|
||||
add_wav_header: bool = False
|
||||
audio_frame_size: int = 6400 # 200ms
|
||||
serializer: FrameSerializer = ProtobufFrameSerializer()
|
||||
|
||||
|
||||
class WebsocketServerCallbacks(BaseModel):
|
||||
on_connection: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]]
|
||||
|
||||
|
||||
class WebsocketServerInputTransport(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int,
|
||||
params: WebsocketServerParams,
|
||||
callbacks: WebsocketServerCallbacks):
|
||||
super().__init__()
|
||||
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._params = params
|
||||
self._callbacks = callbacks
|
||||
|
||||
self._websocket: websockets.WebSocketServerProtocol | None = None
|
||||
|
||||
self._stop_server_event = asyncio.Event()
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
self._create_push_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, CancelFrame):
|
||||
await self._stop()
|
||||
# We don't queue a CancelFrame since we want to stop ASAP.
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, StartFrame):
|
||||
await self._start()
|
||||
await self._internal_push_frame(frame, direction)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self._stop()
|
||||
await self._internal_push_frame(frame, direction)
|
||||
else:
|
||||
await self._internal_push_frame(frame, direction)
|
||||
|
||||
async def _server_task_handler(self):
|
||||
logger.info(f"Starting websocket server on {self._host}:{self._port}")
|
||||
async with websockets.serve(self._client_handler, self._host, self._port) as server:
|
||||
await self._stop_server_event.wait()
|
||||
|
||||
async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path):
|
||||
logger.info(f"New client connection from {websocket.remote_address}")
|
||||
if self._websocket:
|
||||
await self._websocket.close()
|
||||
logger.warning("Only one client connected, using new connection")
|
||||
|
||||
self._websocket = websocket
|
||||
|
||||
# Notify
|
||||
await self._callbacks.on_connection(websocket)
|
||||
|
||||
# Handle incoming messages
|
||||
async for message in websocket:
|
||||
frame = self._params.serializer.deserialize(message)
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
logger.info(f"Client {websocket.remote_address} disconnected")
|
||||
|
||||
async def _start(self):
|
||||
loop = self.get_event_loop()
|
||||
self._server_task = loop.create_task(self._server_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._stop_server_event.set()
|
||||
self._push_frame_task.cancel()
|
||||
await self._server_task
|
||||
|
||||
#
|
||||
# Push frames task
|
||||
#
|
||||
|
||||
def _create_push_task(self):
|
||||
loop = self.get_event_loop()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
self._push_queue = asyncio.Queue()
|
||||
|
||||
async def _internal_push_frame(
|
||||
self,
|
||||
frame: Frame | None,
|
||||
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
|
||||
await self._push_queue.put((frame, direction))
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
|
||||
class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
def __init__(self, params: WebsocketServerParams):
|
||||
super().__init__(params)
|
||||
|
||||
self._params = params
|
||||
|
||||
self._websocket: websockets.WebSocketServerProtocol | None = None
|
||||
self._audio_buffer = bytes()
|
||||
|
||||
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol):
|
||||
if self._websocket:
|
||||
await self._websocket.close()
|
||||
logger.warning("Only one client allowed, using new connection")
|
||||
self._websocket = websocket
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
self._audio_buffer += frames
|
||||
while len(self._audio_buffer) >= self._params.audio_frame_size:
|
||||
frame = AudioRawFrame(
|
||||
audio=self._audio_buffer[:self._params.audio_frame_size],
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
num_channels=self._params.audio_out_channels
|
||||
)
|
||||
|
||||
if self._params.add_wav_header:
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(frame.num_channels)
|
||||
ww.setframerate(frame.sample_rate)
|
||||
ww.writeframes(frame.audio)
|
||||
ww.close()
|
||||
content.seek(0)
|
||||
wav_frame = AudioRawFrame(
|
||||
content.read(),
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels)
|
||||
frame = wav_frame
|
||||
|
||||
proto = self._params.serializer.serialize(frame)
|
||||
await self._websocket.send(proto)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
|
||||
|
||||
class WebsocketServerTransport(BaseTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "localhost",
|
||||
port: int = 8765,
|
||||
params: WebsocketServerParams = WebsocketServerParams(),
|
||||
loop: asyncio.AbstractEventLoop | None = None):
|
||||
super().__init__(loop)
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._params = params
|
||||
|
||||
self._callbacks = WebsocketServerCallbacks(
|
||||
on_connection=self._on_connection
|
||||
)
|
||||
self._input: WebsocketServerInputTransport | None = None
|
||||
self._output: WebsocketServerOutputTransport | None = None
|
||||
self._websocket: websockets.WebSocketServerProtocol | None = None
|
||||
|
||||
# Register supported handlers. The user will only be able to register
|
||||
# these handlers.
|
||||
self._register_event_handler("on_client_connected")
|
||||
|
||||
def input(self) -> FrameProcessor:
|
||||
if not self._input:
|
||||
self._input = WebsocketServerInputTransport(
|
||||
self._host, self._port, self._params, self._callbacks)
|
||||
return self._input
|
||||
|
||||
def output(self) -> FrameProcessor:
|
||||
if not self._output:
|
||||
self._output = WebsocketServerOutputTransport(self._params)
|
||||
return self._output
|
||||
|
||||
async def _on_connection(self, websocket):
|
||||
if self._output:
|
||||
await self._output.set_client_connection(websocket)
|
||||
await self._call_event_handler("on_client_connected", websocket)
|
||||
else:
|
||||
logger.error("A WebsocketServerTransport output is missing in the pipeline")
|
||||
@@ -6,15 +6,12 @@
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import inspect
|
||||
import queue
|
||||
import time
|
||||
import types
|
||||
|
||||
from dataclasses import dataclass
|
||||
from functools import partial
|
||||
from typing import Any, Callable, Mapping
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from daily import (
|
||||
CallClient,
|
||||
@@ -139,7 +136,8 @@ class DailyTransportClient(EventHandler):
|
||||
token: str | None,
|
||||
bot_name: str,
|
||||
params: DailyParams,
|
||||
callbacks: DailyCallbacks):
|
||||
callbacks: DailyCallbacks,
|
||||
loop: asyncio.AbstractEventLoop):
|
||||
super().__init__()
|
||||
|
||||
if not self._daily_initialized:
|
||||
@@ -151,6 +149,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._bot_name: str = bot_name
|
||||
self._params: DailyParams = params
|
||||
self._callbacks = callbacks
|
||||
self._loop = loop
|
||||
|
||||
self._participant_id: str = ""
|
||||
self._video_renderers = {}
|
||||
@@ -183,9 +182,6 @@ class DailyTransportClient(EventHandler):
|
||||
def participant_id(self) -> str:
|
||||
return self._participant_id
|
||||
|
||||
def set_callbacks(self, callbacks: DailyCallbacks):
|
||||
self._callbacks = callbacks
|
||||
|
||||
def send_message(self, frame: DailyTransportMessageFrame):
|
||||
self._client.send_app_message(frame.message, frame.participant_id)
|
||||
|
||||
@@ -212,8 +208,7 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
self._joining = True
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(self._executor, self._join)
|
||||
await self._loop.run_in_executor(self._executor, self._join)
|
||||
|
||||
def _join(self):
|
||||
logger.info(f"Joining {self._room_url}")
|
||||
@@ -304,8 +299,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._joined = False
|
||||
self._leaving = True
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(self._executor, self._leave)
|
||||
await self._loop.run_in_executor(self._executor, self._leave)
|
||||
|
||||
def _leave(self):
|
||||
logger.info(f"Leaving {self._room_url}")
|
||||
@@ -335,8 +329,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._callbacks.on_error(error_msg)
|
||||
|
||||
async def cleanup(self):
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(self._executor, self._cleanup)
|
||||
await self._loop.run_in_executor(self._executor, self._cleanup)
|
||||
|
||||
def _cleanup(self):
|
||||
if self._client:
|
||||
@@ -468,8 +461,10 @@ class DailyInputTransport(BaseInputTransport):
|
||||
|
||||
self._client = client
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
self._video_renderers = {}
|
||||
self._camera_in_queue = queue.Queue()
|
||||
self._camera_in_queue = asyncio.Queue()
|
||||
|
||||
self._vad_analyzer = params.vad_analyzer
|
||||
if params.vad_enabled and not params.vad_analyzer:
|
||||
@@ -478,39 +473,34 @@ class DailyInputTransport(BaseInputTransport):
|
||||
num_channels=self._params.audio_in_channels)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
# This will set _running=True
|
||||
# Create camera in task
|
||||
self._camera_in_task = self.get_event_loop().create_task(self._camera_in_task_handler())
|
||||
await super().start(frame)
|
||||
# Create camera in thread (runs if _running is true).
|
||||
loop = asyncio.get_running_loop()
|
||||
self._camera_in_thread = loop.run_in_executor(
|
||||
self._in_executor, self._camera_in_thread_handler)
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
# This will set _running=False
|
||||
# The task will stop.
|
||||
self._camera_in_task.cancel()
|
||||
await super().stop()
|
||||
# The thread will stop.
|
||||
await self._camera_in_thread
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await self._client.cleanup()
|
||||
await super().cleanup()
|
||||
|
||||
def vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
async def vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
state = VADState.QUIET
|
||||
if self._vad_analyzer:
|
||||
state = self._vad_analyzer.analyze_audio(audio_frames)
|
||||
state = await self._vad_analyzer.analyze_audio(audio_frames)
|
||||
return state
|
||||
|
||||
def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
return self._client.read_raw_audio_frames(frame_count)
|
||||
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
|
||||
def read_audio(frame_count: int) -> bytes:
|
||||
return self._client.read_raw_audio_frames(frame_count)
|
||||
|
||||
return await self.get_event_loop().run_in_executor(self._executor, read_audio, frame_count)
|
||||
|
||||
#
|
||||
# FrameProcessor
|
||||
@@ -585,20 +575,19 @@ class DailyInputTransport(BaseInputTransport):
|
||||
image=buffer,
|
||||
size=size,
|
||||
format=format)
|
||||
self._camera_in_queue.put(frame)
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._camera_in_queue.put(frame), self.get_event_loop())
|
||||
|
||||
self._video_renderers[participant_id]["timestamp"] = curr_time
|
||||
|
||||
def _camera_in_thread_handler(self):
|
||||
while self._running:
|
||||
async def _camera_in_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
frame = self._camera_in_queue.get(timeout=1)
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
frame = await self._camera_in_queue.get()
|
||||
await self._internal_push_frame(frame)
|
||||
self._camera_in_queue.task_done()
|
||||
except queue.Empty:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"Error capturing video: {e}")
|
||||
|
||||
@@ -610,39 +599,52 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
|
||||
self._client = client
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
# This will set _running=True
|
||||
await super().start(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
await super().start(frame)
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
# This will set _running=False
|
||||
await super().stop()
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
await super().stop()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await self._client.cleanup()
|
||||
await super().cleanup()
|
||||
|
||||
def send_message(self, frame: DailyTransportMessageFrame):
|
||||
self._client.send_message(frame)
|
||||
async def send_message(self, frame: DailyTransportMessageFrame):
|
||||
def send_message(frame: DailyTransportMessageFrame):
|
||||
self._client.send_message(frame)
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._client.write_raw_audio_frames(frames)
|
||||
await self.get_event_loop().run_in_executor(self._executor, send_message, frame)
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self._client.write_frame_to_camera(frame)
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
def write_audio(frames: bytes):
|
||||
self._client.write_raw_audio_frames(frames)
|
||||
|
||||
await self.get_event_loop().run_in_executor(self._executor, write_audio, frames)
|
||||
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
def write_image(frame: ImageRawFrame):
|
||||
self._client.write_frame_to_camera(frame)
|
||||
|
||||
await self.get_event_loop().run_in_executor(self._executor, write_image, frame)
|
||||
|
||||
|
||||
class DailyTransport(BaseTransport):
|
||||
|
||||
def __init__(self, room_url: str, token: str | None, bot_name: str, params: DailyParams):
|
||||
def __init__(
|
||||
self,
|
||||
room_url: str,
|
||||
token: str | None,
|
||||
bot_name: str,
|
||||
params: DailyParams,
|
||||
loop: asyncio.AbstractEventLoop | None = None):
|
||||
super().__init__(loop)
|
||||
|
||||
callbacks = DailyCallbacks(
|
||||
on_joined=self._on_joined,
|
||||
on_left=self._on_left,
|
||||
@@ -660,12 +662,10 @@ class DailyTransport(BaseTransport):
|
||||
)
|
||||
self._params = params
|
||||
|
||||
self._client = DailyTransportClient(room_url, token, bot_name, params, callbacks)
|
||||
self._client = DailyTransportClient(
|
||||
room_url, token, bot_name, params, callbacks, self._loop)
|
||||
self._input: DailyInputTransport | None = None
|
||||
self._output: DailyOutputTransport | None = None
|
||||
self._loop = asyncio.get_running_loop()
|
||||
|
||||
self._event_handlers: dict = {}
|
||||
|
||||
# Register supported handlers. The user will only be able to register
|
||||
# these handlers.
|
||||
@@ -741,10 +741,10 @@ class DailyTransport(BaseTransport):
|
||||
participant_id, framerate, video_source, color_format)
|
||||
|
||||
def _on_joined(self, participant):
|
||||
self.on_joined(participant)
|
||||
self._call_async_event_handler("on_joined", participant)
|
||||
|
||||
def _on_left(self):
|
||||
self.on_left()
|
||||
self._call_async_event_handler("on_left")
|
||||
|
||||
def _on_error(self, error):
|
||||
# TODO(aleix): Report error to input/output transports. The one managing
|
||||
@@ -754,10 +754,10 @@ class DailyTransport(BaseTransport):
|
||||
def _on_app_message(self, message: Any, sender: str):
|
||||
if self._input:
|
||||
self._input.push_app_message(message, sender)
|
||||
self.on_app_message(message, sender)
|
||||
self._call_async_event_handler("on_app_message", message, sender)
|
||||
|
||||
def _on_call_state_updated(self, state: str):
|
||||
self.on_call_state_updated(state)
|
||||
self._call_async_event_handler("on_call_state_updated", state)
|
||||
|
||||
async def _handle_dialin_ready(self, sip_endpoint: str):
|
||||
if not self._params.dialin_settings:
|
||||
@@ -793,28 +793,28 @@ class DailyTransport(BaseTransport):
|
||||
def _on_dialin_ready(self, sip_endpoint):
|
||||
if self._params.dialin_settings:
|
||||
asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop)
|
||||
self.on_dialin_ready(sip_endpoint)
|
||||
self._call_async_event_handler("on_dialin_ready", sip_endpoint)
|
||||
|
||||
def _on_dialout_connected(self, data):
|
||||
self.on_dialout_connected(data)
|
||||
self._call_async_event_handler("on_dialout_connected", data)
|
||||
|
||||
def _on_dialout_stopped(self, data):
|
||||
self.on_dialout_stopped(data)
|
||||
self._call_async_event_handler("on_dialout_stopped", data)
|
||||
|
||||
def _on_dialout_error(self, data):
|
||||
self.on_dialout_error(data)
|
||||
self._call_async_event_handler("on_dialout_error", data)
|
||||
|
||||
def _on_dialout_warning(self, data):
|
||||
self.on_dialout_warning(data)
|
||||
self._call_async_event_handler("on_dialout_warning", data)
|
||||
|
||||
def _on_participant_joined(self, participant):
|
||||
self.on_participant_joined(participant)
|
||||
self._call_async_event_handler("on_participant_joined", participant)
|
||||
|
||||
def _on_participant_left(self, participant, reason):
|
||||
self.on_participant_left(participant, reason)
|
||||
self._call_async_event_handler("on_participant_left", participant, reason)
|
||||
|
||||
def _on_first_participant_joined(self, participant):
|
||||
self.on_first_participant_joined(participant)
|
||||
self._call_async_event_handler("on_first_participant_joined", participant)
|
||||
|
||||
def _on_transcription_message(self, participant_id, message):
|
||||
text = message["text"]
|
||||
@@ -829,84 +829,7 @@ class DailyTransport(BaseTransport):
|
||||
if self._input:
|
||||
self._input.push_transcription_frame(frame)
|
||||
|
||||
#
|
||||
# Decorators (event handlers)
|
||||
#
|
||||
|
||||
def on_joined(self, participant):
|
||||
pass
|
||||
|
||||
def on_left(self):
|
||||
pass
|
||||
|
||||
def on_app_message(self, message, sender):
|
||||
pass
|
||||
|
||||
def on_call_state_updated(self, state):
|
||||
pass
|
||||
|
||||
def on_dialin_ready(self, sip_endpoint):
|
||||
pass
|
||||
|
||||
def on_dialout_connected(self, data):
|
||||
pass
|
||||
|
||||
def on_dialout_stopped(self, data):
|
||||
pass
|
||||
|
||||
def on_dialout_error(self, data):
|
||||
pass
|
||||
|
||||
def on_dialout_warning(self, data):
|
||||
pass
|
||||
|
||||
def on_first_participant_joined(self, participant):
|
||||
pass
|
||||
|
||||
def on_participant_joined(self, participant):
|
||||
pass
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
pass
|
||||
|
||||
def event_handler(self, event_name: str):
|
||||
def decorator(handler):
|
||||
self._add_event_handler(event_name, handler)
|
||||
return handler
|
||||
return decorator
|
||||
|
||||
def _register_event_handler(self, event_name: str):
|
||||
methods = inspect.getmembers(self, predicate=inspect.ismethod)
|
||||
if event_name not in [method[0] for method in methods]:
|
||||
raise Exception(f"Event handler {event_name} not found")
|
||||
|
||||
self._event_handlers[event_name] = [getattr(self, event_name)]
|
||||
|
||||
patch_method = types.MethodType(partial(self._patch_method, event_name), self)
|
||||
setattr(self, event_name, patch_method)
|
||||
|
||||
def _add_event_handler(self, event_name: str, handler):
|
||||
if event_name not in self._event_handlers:
|
||||
raise Exception(f"Event handler {event_name} not registered")
|
||||
self._event_handlers[event_name].append(types.MethodType(handler, self))
|
||||
|
||||
def _patch_method(self, event_name, *args, **kwargs):
|
||||
try:
|
||||
for handler in self._event_handlers[event_name]:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
# Beware, if handler() calls another event handler it
|
||||
# will deadlock. You shouldn't do that anyways.
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
handler(*args[1:], **kwargs), self._loop)
|
||||
|
||||
# wait for the coroutine to finish. This will also
|
||||
# raise any exceptions raised by the coroutine.
|
||||
future.result()
|
||||
else:
|
||||
handler(*args[1:], **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception in event handler {event_name}: {e}")
|
||||
raise e
|
||||
|
||||
# def start_recording(self):
|
||||
# self.client.start_recording()
|
||||
def _call_async_event_handler(self, event_name: str, *args, **kwargs):
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._call_event_handler(event_name, *args, **kwargs), self._loop)
|
||||
future.result()
|
||||
|
||||
@@ -46,7 +46,7 @@ class SileroVADAnalyzer(VADAnalyzer):
|
||||
def num_frames_required(self) -> int:
|
||||
return int(self.sample_rate / 100) * 4 # 40ms
|
||||
|
||||
def voice_confidence(self, buffer) -> float:
|
||||
async def voice_confidence(self, buffer) -> float:
|
||||
try:
|
||||
audio_int16 = np.frombuffer(buffer, np.int16)
|
||||
# Divide by 32768 because we have signed 16-bit data.
|
||||
@@ -88,7 +88,7 @@ class SileroVAD(FrameProcessor):
|
||||
async def _analyze_audio(self, frame: AudioRawFrame):
|
||||
# Check VAD and push event if necessary. We just care about changes
|
||||
# from QUIET to SPEAKING and vice versa.
|
||||
new_vad_state = self._vad_analyzer.analyze_audio(frame.audio)
|
||||
new_vad_state = await self._vad_analyzer.analyze_audio(frame.audio)
|
||||
if new_vad_state != self._processor_vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
|
||||
new_frame = None
|
||||
|
||||
|
||||
@@ -58,14 +58,14 @@ class VADAnalyzer:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def voice_confidence(self, buffer) -> float:
|
||||
async def voice_confidence(self, buffer) -> float:
|
||||
pass
|
||||
|
||||
def _get_smoothed_volume(self, audio: bytes) -> float:
|
||||
volume = calculate_audio_volume(audio, self._sample_rate)
|
||||
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
|
||||
|
||||
def analyze_audio(self, buffer) -> VADState:
|
||||
async def analyze_audio(self, buffer) -> VADState:
|
||||
self._vad_buffer += buffer
|
||||
|
||||
num_required_bytes = self._vad_frames_num_bytes
|
||||
@@ -75,7 +75,7 @@ class VADAnalyzer:
|
||||
audio_frames = self._vad_buffer[:num_required_bytes]
|
||||
self._vad_buffer = self._vad_buffer[num_required_bytes:]
|
||||
|
||||
confidence = self.voice_confidence(audio_frames)
|
||||
confidence = await self.voice_confidence(audio_frames)
|
||||
|
||||
volume = self._get_smoothed_volume(audio_frames)
|
||||
self._prev_volume = volume
|
||||
|
||||
Reference in New Issue
Block a user