Compare commits

...

16 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
7a6136fa9d make all transports read/write functions async 2024-05-30 23:24:47 -07:00
Aleix Conchillo Flaqué
219af71227 update CHANGELOG and create websocker-server instructions 2024-05-30 15:02:12 -07:00
Aleix Conchillo Flaqué
d2117c6f32 examples: fix websocket-client audio playback 2024-05-30 14:48:22 -07:00
Aleix Conchillo Flaqué
533c09741c examples: use gpt-4o model by default 2024-05-30 14:38:13 -07:00
Aleix Conchillo Flaqué
797f82c836 examples: websocket-server updates 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
fe8f1c8360 transport(websocket): update audio_frame_size 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
e38ca3e455 serializers(protobuf): support id and name fields 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
4bab20643b added sample_rate and num_channels to protobuf AudioRawFrame 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
8f7b3e1d5d transports: simplify and fix async and nested decorators 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
4099256c18 use get_event_loop() and move event handlers to BaseTransport 2024-05-30 14:16:55 -07:00
Aleix Conchillo Flaqué
f4ae56d7b6 examples: add and update wbesocket eaxmples 2024-05-30 14:16:54 -07:00
Aleix Conchillo Flaqué
9fd061e33e transports: added new WebsocketServerTransport 2024-05-30 14:16:54 -07:00
Aleix Conchillo Flaqué
5d2f062afa serializers: added BaseSerializer 2024-05-30 14:16:54 -07:00
Aleix Conchillo Flaqué
d8b483dc1e frames: generate protobuf pb2 file for pipecat package 2024-05-30 14:16:54 -07:00
Aleix Conchillo Flaqué
02191b7d58 pyproject: add protobuf dependency 2024-05-30 14:14:27 -07:00
Aleix Conchillo Flaqué
291c39efc4 dev-requirements: add grpcio-tools 2024-05-30 13:04:47 -07:00
44 changed files with 963 additions and 587 deletions

View File

@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Added WebsocketServerTransport. This will create a websocket server and will
read messages coming from a client. The messages are serialized/deserialized
with protobufs. See `examples/websocket-server` for a detailed example.
### Fixed
- Fixed SileroVAD frame processor.

View File

@@ -1,6 +1,6 @@
BSD 2-Clause License
Copyright (c) 2024, Kwindla Hultman Kramer
Copyright (c) 2024, Daily
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

View File

@@ -1,5 +1,6 @@
autopep8~=2.1.0
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pytest~=8.2.0
setuptools~=69.5.1

View File

@@ -44,7 +44,7 @@ async def main(room_url):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -93,7 +93,7 @@ async def main(room_url):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
imagegen = FalImageGenService(
params=FalImageGenService.InputParams(

View File

@@ -76,7 +76,7 @@ async def main():
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,

View File

@@ -81,7 +81,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -53,7 +53,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -53,7 +53,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -95,7 +95,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
tts = ElevenLabsTTSService(
aiohttp_session=session,

View File

@@ -71,7 +71,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
llm.register_function(
"get_current_weather",
fetch_weather_from_api,

View File

@@ -0,0 +1,39 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.transports.network.websocket_server import WebsocketServerTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
transport = WebsocketServerTransport()
pipeline = Pipeline([transport.input(), transport.output()])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,25 +0,0 @@
syntax = "proto3";
package pipecat_proto;
message TextFrame {
string text = 1;
}
message AudioFrame {
bytes audio = 1;
}
message TranscriptionFrame {
string text = 1;
string participant_id = 2;
string timestamp = 3;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -1,134 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="//cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
<title>WebSocket Audio Stream</title>
</head>
<body>
<h1>WebSocket Audio Stream</h1>
<button id="startAudioBtn">Start Audio</button>
<button id="stopAudioBtn">Stop Audio</button>
<script>
const SAMPLE_RATE = 16000;
const BUFFER_SIZE = 8192;
const MIN_AUDIO_SIZE = 6400;
let audioContext;
let microphoneStream;
let scriptProcessor;
let source;
let frame;
let audioChunks = [];
let isPlaying = false;
let ws;
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) throw err;
frame = root.lookupType("pipecat_proto.Frame");
});
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => console.log("WebSocket connection closed.", event.code, event.reason));
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
enqueueAudioFromProto(arrayBuffer);
}
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) return false;
const frameCount = parsedFrame.audio.data.length / 2;
const audioOutBuffer = audioContext.createBuffer(1, frameCount, SAMPLE_RATE);
const nowBuffering = audioOutBuffer.getChannelData(0);
const view = new Int16Array(parsedFrame.audio.data.buffer);
for (let i = 0; i < frameCount; i++) {
const word = view[i];
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
}
audioChunks.push(audioOutBuffer);
if (!isPlaying) playNextChunk();
}
function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}
isPlaying = true;
const audioOutBuffer = audioChunks.shift();
const source = audioContext.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioContext.destination);
source.onended = playNextChunk;
source.start();
}
function startAudio() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
navigator.mediaDevices.getUserMedia({ audio: true })
.then((stream) => {
microphoneStream = stream;
audioContext = new (window.AudioContext || window.webkitAudioContext)();
scriptProcessor = audioContext.createScriptProcessor(BUFFER_SIZE, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
const audioBuffer = [];
const skipRatio = Math.floor(audioContext.sampleRate / (SAMPLE_RATE * 2));
scriptProcessor.onaudioprocess = (event) => {
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
for (let i = 0; i < rawLeftChannelData.length; i += skipRatio) {
const normalized = ((rawLeftChannelData[i] * 32768.0) + 32768) % 65536 - 32768;
const swappedBytes = ((normalized & 0xff) << 8) | ((normalized >> 8) & 0xff);
audioBuffer.push(swappedBytes);
}
if (audioBuffer.length >= MIN_AUDIO_SIZE) {
const audioFrame = frame.create({ audio: { audio: audioBuffer.slice(0, MIN_AUDIO_SIZE) } });
const encodedFrame = new Uint8Array(frame.encode(audioFrame).finish());
ws.send(encodedFrame);
audioBuffer.splice(0, MIN_AUDIO_SIZE);
}
};
initWebSocket();
})
.catch((error) => console.error('Error accessing microphone:', error));
}
function stopAudio() {
if (ws) {
ws.close();
scriptProcessor.disconnect();
source.disconnect();
ws = undefined;
}
}
document.getElementById('startAudioBtn').addEventListener('click', startAudio);
document.getElementById('stopAudioBtn').addEventListener('click', stopAudio);
</script>
</body>
</html>

View File

@@ -1,50 +0,0 @@
import asyncio
import aiohttp
import logging
import os
from pipecat.pipeline.frame_processor import FrameProcessor
from pipecat.pipeline.frames import TextFrame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.elevenlabs_ai_services import ElevenLabsTTSService
from pipecat.transports.websocket_transport import WebsocketTransport
from pipecat.services.whisper_ai_services import WhisperSTTService
logging.basicConfig(format="%(levelno)s %(asctime)s %(message)s")
logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
class WhisperTranscriber(FrameProcessor):
async def process_frame(self, frame):
if isinstance(frame, TranscriptionFrame):
print(f"Transcribed: {frame.text}")
else:
yield frame
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketTransport(
mic_enabled=True,
speaker_enabled=True,
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
pipeline = Pipeline([
WhisperSTTService(),
WhisperTranscriber(),
tts,
])
@transport.on_connection
async def queue_frame():
await pipeline.queue_frames([TextFrame("Hello there!")])
await transport.run(pipeline)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -145,7 +145,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
ta = TalkingAnimation()

View File

@@ -117,7 +117,7 @@ async def main(room_url: str, token):
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
model="gpt-4o")
messages = [
{

View File

@@ -56,7 +56,7 @@ async def main(room_url, token=None):
llm_service = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo"
model="gpt-4o"
)
tts_service = ElevenLabsTTSService(

View File

@@ -97,7 +97,8 @@ async def main(room_url: str, token):
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4-turbo-preview"
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
sa = SentenceAggregator()

View File

@@ -0,0 +1,27 @@
# Websocket Server
This is an example that shows how to use `WebsocketServerTransport` to communicate with a web client.
## Get started
```python
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
## Run the server
```bash
python server.py
```
## Run the HTTP server
This will host the static web client:
```bash
python -m http.server
```
Then, visit `http://localhost:8000` in your browser to start a session.

View File

@@ -0,0 +1,43 @@
//
// Copyright (c) 2024, Daily
//
// SPDX-License-Identifier: BSD 2-Clause License
//
// Generate frames_pb2.py with:
//
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
syntax = "proto3";
package pipecat;
message TextFrame {
uint64 id = 1;
string name = 2;
string text = 3;
}
message AudioRawFrame {
uint64 id = 1;
string name = 2;
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
}
message TranscriptionFrame {
uint64 id = 1;
string name = 2;
string text = 3;
string user_id = 4;
string timestamp = 5;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioRawFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -0,0 +1,204 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
<title>Pipecat WebSocket Client Example</title>
</head>
<body>
<h1>Pipecat WebSocket Client Example</h1>
<h3><div id="progressText">Loading, wait...</div></h2>
<button id="startAudioBtn">Start Audio</button>
<button id="stopAudioBtn">Stop Audio</button>
<script>
const SAMPLE_RATE = 16000;
const NUM_CHANNELS = 1;
const PLAY_TIME_RESET_THRESHOLD_MS = 1.0;
// The protobuf type. We will load it later.
let Frame = null;
// The websocket connection.
let ws = null;
// The audio context
let audioContext = null;
// The audio context media stream source
let source = null;
// The microphone stream from getUserMedia. SHould be sampled to the
// proper sample rate.
let microphoneStream = null;
// Script processor to get data from microphone.
let scriptProcessor = null;
// AudioContext play time.
let playTime = 0;
// Last time we received a websocket message.
let lastMessageTime = 0;
// Whether we should be playing audio.
let isPlaying = false;
let startBtn = document.getElementById('startAudioBtn');
let stopBtn = document.getElementById('stopAudioBtn');
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) {
throw err;
}
Frame = root.lookupType("pipecat.Frame");
const progressText = document.getElementById("progressText");
progressText.textContent = "We are ready! Make sure to run the server and then click `Start Audio`.";
startBtn.disabled = false;
stopBtn.disabled = true;
});
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => {
console.log("WebSocket connection closed.", event.code, event.reason);
stopAudio(false);
});
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
if (isPlaying) {
enqueueAudioFromProto(arrayBuffer);
}
}
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = Frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) {
return false;
}
// Reset play time if it's been a while we haven't played anything.
const diffTime = audioContext.currentTime - lastMessageTime;
if ((playTime == 0) || (diffTime > PLAY_TIME_RESET_THRESHOLD_MS)) {
playTime = audioContext.currentTime;
}
lastMessageTime = audioContext.currentTime;
// We should be able to use parsedFrame.audio.audio.buffer but for
// some reason that contains all the bytes from the protobuf message.
const audioVector = Array.from(parsedFrame.audio.audio);
const audioArray = new Uint8Array(audioVector);
audioContext.decodeAudioData(audioArray.buffer, function(buffer) {
const source = new AudioBufferSourceNode(audioContext);
source.buffer = buffer;
source.start(playTime);
source.connect(audioContext.destination);
playTime = playTime + buffer.duration;
});
}
function convertFloat32ToS16PCM(float32Array) {
let int16Array = new Int16Array(float32Array.length);
for (let i = 0; i < float32Array.length; i++) {
let clampedValue = Math.max(-1, Math.min(1, float32Array[i]));
int16Array[i] = clampedValue < 0 ? clampedValue * 32768 : clampedValue * 32767;
}
return int16Array;
}
function startAudioBtnHandler() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
startBtn.disabled = true;
stopBtn.disabled = false;
audioContext = new (window.AudioContext || window.webkitAudioContext)({
latencyHint: "interactive",
sampleRate: SAMPLE_RATE
});
isPlaying = true;
initWebSocket();
navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: SAMPLE_RATE,
channelCount: NUM_CHANNELS,
autoGainControl: true,
echoCancellation: true,
noiseSuppression: true,
}
}).then((stream) => {
microphoneStream = stream;
// 512 is closest thing to 200ms.
scriptProcessor = audioContext.createScriptProcessor(512, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
scriptProcessor.onaudioprocess = (event) => {
if (!ws) {
return;
}
const audioData = event.inputBuffer.getChannelData(0);
const pcmS16Array = convertFloat32ToS16PCM(audioData);
const pcmByteArray = new Uint8Array(pcmS16Array.buffer);
const frame = Frame.create({
audio: {
audio: Array.from(pcmByteArray),
sampleRate: SAMPLE_RATE,
numChannels: NUM_CHANNELS
}
});
const encodedFrame = new Uint8Array(Frame.encode(frame).finish());
ws.send(encodedFrame);
};
}).catch((error) => console.error('Error accessing microphone:', error));
}
function stopAudio(closeWebsocket) {
isPlaying = false;
startBtn.disabled = false;
stopBtn.disabled = true;
if (ws && closeWebsocket) {
ws.close();
ws = null;
}
if (scriptProcessor) {
scriptProcessor.disconnect();
}
if (source) {
source.disconnect();
}
}
function stopAudioBtnHandler() {
stopAudio(true);
}
startBtn.addEventListener('click', startAudioBtnHandler);
stopBtn.addEventListener('click', stopAudioBtnHandler);
startBtn.disabled = true;
stopBtn.disabled = true;
</script>
</body>
</html>

View File

@@ -0,0 +1,2 @@
python-dotenv
pipecat-ai[openai,silero,websocket,whisper]

View File

@@ -0,0 +1,90 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import WhisperSTTService
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.vad.silero import SileroVAD
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_enabled=True,
add_wav_header=True
)
)
vad = SileroVAD(audio_passthrough=True)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
stt = WhisperSTTService()
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
vad, # VAD to detect user speech
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -42,7 +42,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.2.1
# via faster-whisper
daily-python==0.9.0
daily-python==0.9.1
# via pipecat-ai (pyproject.toml)
distro==1.9.0
# via
@@ -226,6 +226,7 @@ protobuf==4.25.3
# googleapis-common-protos
# grpcio-status
# onnxruntime
# pipecat-ai (pyproject.toml)
# proto-plus
# pyht
pyasn1==0.6.0
@@ -259,7 +260,7 @@ pyyaml==6.0.1
# transformers
regex==2024.5.15
# via transformers
requests==2.32.2
requests==2.32.3
# via
# google-api-core
# huggingface-hub

View File

@@ -208,6 +208,7 @@ protobuf==4.25.3
# googleapis-common-protos
# grpcio-status
# onnxruntime
# pipecat-ai (pyproject.toml)
# proto-plus
# pyht
pyasn1==0.6.0

View File

@@ -24,6 +24,7 @@ dependencies = [
"numpy~=1.26.4",
"loguru~=0.7.0",
"Pillow~=10.3.0",
"protobuf~=4.25.3",
"pyloudnorm~=0.1.1",
"typing-extensions~=4.11.0",
]

View File

@@ -4,28 +4,40 @@
// SPDX-License-Identifier: BSD 2-Clause License
//
// Generate frames_pb2.py with:
//
// python -m grpc_tools.protoc --proto_path=./ --python_out=./protobufs frames.proto
syntax = "proto3";
package pipecat_proto;
package pipecat;
message TextFrame {
string text = 1;
uint64 id = 1;
string name = 2;
string text = 3;
}
message AudioFrame {
bytes data = 1;
message AudioRawFrame {
uint64 id = 1;
string name = 2;
bytes audio = 3;
uint32 sample_rate = 4;
uint32 num_channels = 5;
}
message TranscriptionFrame {
string text = 1;
string participantId = 2;
string timestamp = 3;
uint64 id = 1;
string name = 2;
string text = 3;
string user_id = 4;
string timestamp = 5;
}
message Frame {
oneof frame {
TextFrame text = 1;
AudioFrame audio = 2;
TranscriptionFrame transcription = 3;
}
oneof frame {
TextFrame text = 1;
AudioRawFrame audio = 2;
TranscriptionFrame transcription = 3;
}
}

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: frames.proto
# Protobuf Python Version: 4.25.3
# Protobuf Python Version: 4.25.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
@@ -14,19 +14,19 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\rpipecat_proto\"\x19\n\tTextFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\"\x1a\n\nAudioFrame\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"L\n\x12TranscriptionFrame\x12\x0c\n\x04text\x18\x01 \x01(\t\x12\x15\n\rparticipantId\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"\xa2\x01\n\x05\x46rame\x12(\n\x04text\x18\x01 \x01(\x0b\x32\x18.pipecat_proto.TextFrameH\x00\x12*\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x19.pipecat_proto.AudioFrameH\x00\x12:\n\rtranscription\x18\x03 \x01(\x0b\x32!.pipecat_proto.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x66rames.proto\x12\x07pipecat\"3\n\tTextFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\"c\n\rAudioRawFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05\x61udio\x18\x03 \x01(\x0c\x12\x13\n\x0bsample_rate\x18\x04 \x01(\r\x12\x14\n\x0cnum_channels\x18\x05 \x01(\r\"`\n\x12TranscriptionFrame\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x0f\n\x07user_id\x18\x04 \x01(\t\x12\x11\n\ttimestamp\x18\x05 \x01(\t\"\x93\x01\n\x05\x46rame\x12\"\n\x04text\x18\x01 \x01(\x0b\x32\x12.pipecat.TextFrameH\x00\x12\'\n\x05\x61udio\x18\x02 \x01(\x0b\x32\x16.pipecat.AudioRawFrameH\x00\x12\x34\n\rtranscription\x18\x03 \x01(\x0b\x32\x1b.pipecat.TranscriptionFrameH\x00\x42\x07\n\x05\x66rameb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'frames_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_TEXTFRAME']._serialized_start=31
_globals['_TEXTFRAME']._serialized_end=56
_globals['_AUDIOFRAME']._serialized_start=58
_globals['_AUDIOFRAME']._serialized_end=84
_globals['_TRANSCRIPTIONFRAME']._serialized_start=86
_globals['_TRANSCRIPTIONFRAME']._serialized_end=162
_globals['_FRAME']._serialized_start=165
_globals['_FRAME']._serialized_end=327
_globals['_TEXTFRAME']._serialized_start=25
_globals['_TEXTFRAME']._serialized_end=76
_globals['_AUDIORAWFRAME']._serialized_start=78
_globals['_AUDIORAWFRAME']._serialized_end=177
_globals['_TRANSCRIPTIONFRAME']._serialized_start=179
_globals['_TRANSCRIPTIONFRAME']._serialized_end=275
_globals['_FRAME']._serialized_start=278
_globals['_FRAME']._serialized_end=425
# @@protoc_insertion_point(module_scope)

View File

@@ -4,8 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Callable, Coroutine, List
from pipecat.frames.frames import Frame
@@ -67,7 +65,8 @@ class Pipeline(FrameProcessor):
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
async def _cleanup_processors(self):
await asyncio.gather(*[p.cleanup() for p in self._processors])
for p in self._processors:
await p.cleanup()
def _link_processors(self):
prev = self._processors[0]

View File

@@ -5,7 +5,7 @@
#
import asyncio
from asyncio import AbstractEventLoop
from enum import Enum
from pipecat.frames.frames import ErrorFrame, Frame
@@ -21,12 +21,12 @@ class FrameDirection(Enum):
class FrameProcessor:
def __init__(self):
def __init__(self, loop: asyncio.AbstractEventLoop | None = None):
self.id: int = obj_id()
self.name = f"{self.__class__.__name__}#{obj_count(self)}"
self._prev: "FrameProcessor" | None = None
self._next: "FrameProcessor" | None = None
self._loop: AbstractEventLoop = asyncio.get_running_loop()
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
async def cleanup(self):
pass
@@ -36,7 +36,7 @@ class FrameProcessor:
processor._prev = self
logger.debug(f"Linking {self} -> {self._next}")
def get_event_loop(self) -> AbstractEventLoop:
def get_event_loop(self) -> asyncio.AbstractEventLoop:
return self._loop
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -1,16 +0,0 @@
from abc import abstractmethod
from pipecat.pipeline.frames import Frame
class FrameSerializer:
def __init__(self):
pass
@abstractmethod
def serialize(self, frame: Frame) -> bytes:
raise NotImplementedError
@abstractmethod
def deserialize(self, data: bytes) -> Frame:
raise NotImplementedError

View File

@@ -0,0 +1,20 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import Frame
class FrameSerializer(ABC):
@abstractmethod
def serialize(self, frame: Frame) -> bytes:
pass
@abstractmethod
def deserialize(self, data: bytes) -> Frame:
pass

View File

@@ -1,14 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import dataclasses
from typing import Text
from pipecat.pipeline.frames import AudioFrame, Frame, TextFrame, TranscriptionFrame
import pipecat.pipeline.protobufs.frames_pb2 as frame_protos
from pipecat.serializers.abstract_frame_serializer import FrameSerializer
import pipecat.frames.protobufs.frames_pb2 as frame_protos
from pipecat.frames.frames import AudioRawFrame, Frame, TextFrame, TranscriptionFrame
from pipecat.serializers.base_serializer import FrameSerializer
class ProtobufFrameSerializer(FrameSerializer):
SERIALIZABLE_TYPES = {
TextFrame: "text",
AudioFrame: "audio",
AudioRawFrame: "audio",
TranscriptionFrame: "transcription"
}
@@ -29,7 +36,8 @@ class ProtobufFrameSerializer(FrameSerializer):
setattr(getattr(proto_frame, proto_optional_name), field.name,
getattr(frame, field.name))
return proto_frame.SerializeToString()
result = proto_frame.SerializeToString()
return result
def deserialize(self, data: bytes) -> Frame:
"""Returns a Frame object from a Frame protobuf. Used to convert frames
@@ -61,4 +69,22 @@ class ProtobufFrameSerializer(FrameSerializer):
args_dict = {}
for field in proto.DESCRIPTOR.fields_by_name[which].message_type.fields:
args_dict[field.name] = getattr(args, field.name)
return class_name(**args_dict)
# Remove special fields if needed
id = getattr(args, "id")
name = getattr(args, "name")
if not id:
del args_dict["id"]
if not name:
del args_dict["name"]
# Create the instance
instance = class_name(**args_dict)
# Set special fields
if id:
setattr(instance, "id", getattr(args, "id"))
if name:
setattr(instance, "name", getattr(args, "name"))
return instance

View File

@@ -171,7 +171,7 @@ class ImageGenService(AIService):
super().__init__()
# Renders the image. Returns an Image object.
@ abstractmethod
@abstractmethod
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
pass
@@ -190,7 +190,7 @@ class VisionService(AIService):
super().__init__()
self._describe_text = None
@ abstractmethod
@abstractmethod
async def run_vision(self, frame: VisionImageRawFrame) -> AsyncGenerator[Frame, None]:
pass

View File

@@ -249,7 +249,7 @@ class BaseOpenAILLMService(LLMService):
class OpenAILLMService(BaseOpenAILLMService):
def __init__(self, model="gpt-4", **kwargs):
def __init__(self, model="gpt-4o", **kwargs):
super().__init__(model, **kwargs)

View File

@@ -5,9 +5,6 @@
#
import asyncio
import queue
from concurrent.futures import ThreadPoolExecutor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
@@ -33,14 +30,11 @@ class BaseInputTransport(FrameProcessor):
self._params = params
self._running = False
self._allow_interruptions = False
self._in_executor = ThreadPoolExecutor(max_workers=5)
# Create audio input queue if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = queue.Queue()
self._audio_in_queue = asyncio.Queue()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
@@ -52,45 +46,27 @@ class BaseInputTransport(FrameProcessor):
# for example.
self._allow_interruptions = frame.allow_interruptions
if self._running:
return
self._running = True
if self._params.audio_in_enabled or self._params.vad_enabled:
loop = self.get_event_loop()
self._audio_in_thread = loop.run_in_executor(
self._in_executor, self._audio_in_thread_handler)
self._audio_out_thread = loop.run_in_executor(
self._in_executor, self._audio_out_thread_handler)
self._audio_task = loop.create_task(self._audio_task_handler())
async def stop(self):
if not self._running:
return
# This will exit all threads.
self._running = False
# Wait for the threads to finish.
# Wait for the tasks to finish.
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_thread
await self._audio_out_thread
self._audio_task.cancel()
self._push_frame_task.cancel()
def vad_analyze(self, audio_frames: bytes) -> VADState:
async def vad_analyze(self, audio_frames: bytes) -> VADState:
pass
def read_raw_audio_frames(self, frame_count: int) -> bytes:
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
pass
#
# Frame processor
#
async def cleanup(self):
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, CancelFrame):
await self.stop()
@@ -150,8 +126,8 @@ class BaseInputTransport(FrameProcessor):
# Audio input
#
def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
new_vad_state = self.vad_analyze(audio_frames)
async def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
new_vad_state = await self.vad_analyze(audio_frames)
if new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
frame = None
if new_vad_state == VADState.SPEAKING:
@@ -160,51 +136,39 @@ class BaseInputTransport(FrameProcessor):
frame = UserStoppedSpeakingFrame()
if frame:
future = asyncio.run_coroutine_threadsafe(
self._handle_interruptions(frame), self.get_event_loop())
future.result()
await self._handle_interruptions(frame)
vad_state = new_vad_state
return vad_state
def _audio_in_thread_handler(self):
async def _audio_task_handler(self):
vad_state: VADState = VADState.QUIET
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) # 10ms of audio
while self._running:
while True:
try:
audio_frames = self.read_raw_audio_frames(num_frames)
audio_frames = await self.read_raw_audio_frames(num_frames)
if len(audio_frames) > 0:
frame = AudioRawFrame(
audio=audio_frames,
sample_rate=sample_rate,
num_channels=num_channels)
self._audio_in_queue.put(frame)
audio_passthrough = True
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
if self._params.vad_enabled:
vad_state = await self._handle_vad(frame.audio, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
# Push audio downstream if passthrough.
if audio_passthrough:
await self._internal_push_frame(frame)
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"Error reading audio frames: {e}")
def _audio_out_thread_handler(self):
vad_state: VADState = VADState.QUIET
while self._running:
try:
frame = self._audio_in_queue.get(timeout=1)
audio_passthrough = True
# Check VAD and push event if necessary. We just care about changes
# from QUIET to SPEAKING and vice versa.
if self._params.vad_enabled:
vad_state = self._handle_vad(frame.audio, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
# Push audio downstream if passthrough.
if audio_passthrough:
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
self._audio_in_queue.task_done()
except queue.Empty:
pass
except BaseException as e:
logger.error(f"Error pushing audio frames: {e}")

View File

@@ -1,17 +1,11 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import itertools
import queue
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from typing import List
@@ -40,22 +34,19 @@ class BaseOutputTransport(FrameProcessor):
self._params = params
self._running = False
self._allow_interruptions = False
self._out_executor = ThreadPoolExecutor(max_workers=5)
# These are the images that we should send to the camera at our desired
# framerate.
self._camera_images = None
# Create media threads queues.
# Create media task queues.
if self._params.camera_out_enabled:
self._camera_out_queue = queue.Queue()
self._sink_queue = queue.Queue()
self._camera_out_queue = asyncio.Queue()
self._sink_queue = asyncio.Queue()
self._stopped_event = asyncio.Event()
self._is_interrupted = threading.Event()
self._is_interrupted = asyncio.Event()
async def start(self, frame: StartFrame):
# Make sure we have the latest params. Note that this transport might
@@ -63,52 +54,40 @@ class BaseOutputTransport(FrameProcessor):
# for example.
self._allow_interruptions = frame.allow_interruptions
if self._running:
return
self._running = True
loop = self.get_event_loop()
if self._params.camera_out_enabled:
self._camera_out_thread = loop.run_in_executor(
self._out_executor, self._camera_out_thread_handler)
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
self._sink_thread = loop.run_in_executor(self._out_executor, self._sink_thread_handler)
self._sink_task = loop.create_task(self._sink_task_handler())
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()
async def stop(self):
if not self._running:
return
# This will exit all threads.
self._running = False
self._stopped_event.set()
def send_message(self, frame: TransportMessageFrame):
async def cleanup(self):
if self._params.camera_out_enabled:
self._camera_out_task.cancel()
self._sink_task.cancel()
self._push_frame_task.cancel()
async def send_message(self, frame: TransportMessageFrame):
pass
def write_frame_to_camera(self, frame: ImageRawFrame):
async def write_frame_to_camera(self, frame: ImageRawFrame):
pass
def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes):
pass
#
# Frame processor
#
async def cleanup(self):
# Wait on the threads to finish.
if self._params.camera_out_enabled:
await self._camera_out_thread
await self._sink_thread
async def process_frame(self, frame: Frame, direction: FrameDirection):
#
# Out-of-band frames like (CancelFrame or StartInterruptionFrame) are
@@ -117,7 +96,7 @@ class BaseOutputTransport(FrameProcessor):
#
if isinstance(frame, StartFrame):
await self.start(frame)
self._sink_queue.put(frame)
await self._sink_queue.put(frame)
# EndFrame is managed in the queue handler.
elif isinstance(frame, CancelFrame):
await self.stop()
@@ -126,11 +105,11 @@ class BaseOutputTransport(FrameProcessor):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
else:
self._sink_queue.put(frame)
await self._sink_queue.put(frame)
# If we are finishing, wait here until we have stopped, otherwise we might
# close things too early upstream. We need this event because we don't
# know when the internal threads will finish.
# know when the internal tasks will finish.
if isinstance(frame, CancelFrame) or isinstance(frame, EndFrame):
await self._stopped_event.wait()
@@ -145,7 +124,7 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, StopInterruptionFrame):
self._is_interrupted.clear()
def _sink_thread_handler(self):
async def _sink_task_handler(self):
# 10ms bytes
bytes_size_10ms = int(self._params.audio_out_sample_rate / 100) * \
self._params.audio_out_channels * 2
@@ -155,37 +134,34 @@ class BaseOutputTransport(FrameProcessor):
# Audio accumlation buffer
buffer = bytearray()
while self._running:
while True:
try:
frame = self._sink_queue.get(timeout=1)
frame = await self._sink_queue.get()
if not self._is_interrupted.is_set():
if isinstance(frame, AudioRawFrame):
if self._params.audio_out_enabled:
buffer.extend(frame.audio)
buffer = self._send_audio_truncated(buffer, smallest_write_size)
buffer = await self._send_audio_truncated(buffer, smallest_write_size)
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
self._set_camera_image(frame)
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
self._set_camera_images(frame.images)
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
self.send_message(frame)
await self.send_message(frame)
else:
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
await self._internal_push_frame(frame)
else:
# If we get interrupted just clear the output buffer.
buffer = bytearray()
if isinstance(frame, EndFrame):
# Send all remaining audio before stopping (multiple of 10ms of audio).
self._send_audio_truncated(buffer, bytes_size_10ms)
future = asyncio.run_coroutine_threadsafe(self.stop(), self.get_event_loop())
future.result()
await self._send_audio_truncated(buffer, bytes_size_10ms)
await self.stop()
self._sink_queue.task_done()
except queue.Empty:
pass
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"Error processing sink queue: {e}")
@@ -219,7 +195,7 @@ class BaseOutputTransport(FrameProcessor):
async def send_image(self, frame: ImageRawFrame | SpriteFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
def _draw_image(self, frame: ImageRawFrame):
async def _draw_image(self, frame: ImageRawFrame):
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
if frame.size != desired_size:
@@ -229,32 +205,32 @@ class BaseOutputTransport(FrameProcessor):
f"{frame} does not have the expected size {desired_size}, resizing")
frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format)
self.write_frame_to_camera(frame)
await self.write_frame_to_camera(frame)
def _set_camera_image(self, image: ImageRawFrame):
async def _set_camera_image(self, image: ImageRawFrame):
if self._params.camera_out_is_live:
self._camera_out_queue.put(image)
await self._camera_out_queue.put(image)
else:
self._camera_images = itertools.cycle([image])
def _set_camera_images(self, images: List[ImageRawFrame]):
async def _set_camera_images(self, images: List[ImageRawFrame]):
self._camera_images = itertools.cycle(images)
def _camera_out_thread_handler(self):
while self._running:
async def _camera_out_task_handler(self):
while True:
try:
if self._params.camera_out_is_live:
image = self._camera_out_queue.get(timeout=1)
self._draw_image(image)
image = await self._camera_out_queue.get()
await self._draw_image(image)
self._camera_out_queue.task_done()
elif self._camera_images:
image = next(self._camera_images)
self._draw_image(image)
time.sleep(1.0 / self._params.camera_out_framerate)
await self._draw_image(image)
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
else:
time.sleep(1.0 / self._params.camera_out_framerate)
except queue.Empty:
pass
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error writing to camera: {e}")
@@ -265,13 +241,9 @@ class BaseOutputTransport(FrameProcessor):
async def send_audio(self, frame: AudioRawFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
def _send_audio_truncated(self, buffer: bytearray, smallest_write_size: int) -> bytearray:
try:
truncated_length: int = len(buffer) - (len(buffer) % smallest_write_size)
if truncated_length:
self.write_raw_audio_frames(bytes(buffer[:truncated_length]))
buffer = buffer[truncated_length:]
return buffer
except BaseException as e:
logger.error(f"Error writing audio frames: {e}")
return buffer
async def _send_audio_truncated(self, buffer: bytearray, smallest_write_size: int) -> bytearray:
truncated_length: int = len(buffer) - (len(buffer) % smallest_write_size)
if truncated_length:
await self.write_raw_audio_frames(bytes(buffer[:truncated_length]))
buffer = buffer[truncated_length:]
return buffer

View File

@@ -4,6 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import inspect
from abc import ABC, abstractmethod
from pydantic import ConfigDict
@@ -12,6 +15,8 @@ from pydantic.main import BaseModel
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.vad.vad_analyzer import VADAnalyzer
from loguru import logger
class TransportParams(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -36,6 +41,10 @@ class TransportParams(BaseModel):
class BaseTransport(ABC):
def __init__(self, loop: asyncio.AbstractEventLoop | None):
self._loop = loop or asyncio.get_running_loop()
self._event_handlers: dict = {}
@abstractmethod
def input(self) -> FrameProcessor:
raise NotImplementedError
@@ -43,3 +52,30 @@ class BaseTransport(ABC):
@abstractmethod
def output(self) -> FrameProcessor:
raise NotImplementedError
def event_handler(self, event_name: str):
def decorator(handler):
self._add_event_handler(event_name, handler)
return handler
return decorator
def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
def _add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)
async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
raise e

View File

@@ -6,6 +6,8 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tkinter as tk
@@ -38,6 +40,8 @@ class TkInputTransport(BaseInputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._executor = ThreadPoolExecutor(max_workers=5)
self._in_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_in_channels,
@@ -45,8 +49,11 @@ class TkInputTransport(BaseInputTransport):
frames_per_buffer=params.audio_in_sample_rate,
input=True)
def read_raw_audio_frames(self, frame_count: int) -> bytes:
return self._in_stream.read(frame_count, exception_on_overflow=False)
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
def read_audio(frame_count: int) -> bytes:
return self._in_stream.read(frame_count, exception_on_overflow=False)
return await self.get_event_loop().run_in_executor(self._executor, read_audio, frame_count)
async def start(self, frame: StartFrame):
await super().start(frame)
@@ -70,6 +77,8 @@ class TkOutputTransport(BaseOutputTransport):
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._executor = ThreadPoolExecutor(max_workers=5)
self._out_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_out_channels,
@@ -83,10 +92,13 @@ class TkOutputTransport(BaseOutputTransport):
self._image_label = tk.Label(tk_root, image=photo)
self._image_label.pack()
def write_raw_audio_frames(self, frames: bytes):
self._out_stream.write(frames)
async def write_raw_audio_frames(self, frames: bytes):
def write_audio(frames: bytes):
self._out_stream.write(frames)
def write_frame_to_camera(self, frame: ImageRawFrame):
await self.get_event_loop().run_in_executor(self._executor, write_audio, frames)
async def write_frame_to_camera(self, frame: ImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
async def start(self, frame: StartFrame):

View File

@@ -0,0 +1,221 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import wave
import websockets
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
StartFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
class WebsocketServerParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = ProtobufFrameSerializer()
class WebsocketServerCallbacks(BaseModel):
on_connection: Callable[[websockets.WebSocketServerProtocol], Awaitable[None]]
class WebsocketServerInputTransport(FrameProcessor):
def __init__(
self,
host: str,
port: int,
params: WebsocketServerParams,
callbacks: WebsocketServerCallbacks):
super().__init__()
self._host = host
self._port = port
self._params = params
self._callbacks = callbacks
self._websocket: websockets.WebSocketServerProtocol | None = None
self._stop_server_event = asyncio.Event()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, CancelFrame):
await self._stop()
# We don't queue a CancelFrame since we want to stop ASAP.
await self.push_frame(frame, direction)
elif isinstance(frame, StartFrame):
await self._start()
await self._internal_push_frame(frame, direction)
elif isinstance(frame, EndFrame):
await self._stop()
await self._internal_push_frame(frame, direction)
else:
await self._internal_push_frame(frame, direction)
async def _server_task_handler(self):
logger.info(f"Starting websocket server on {self._host}:{self._port}")
async with websockets.serve(self._client_handler, self._host, self._port) as server:
await self._stop_server_event.wait()
async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, path):
logger.info(f"New client connection from {websocket.remote_address}")
if self._websocket:
await self._websocket.close()
logger.warning("Only one client connected, using new connection")
self._websocket = websocket
# Notify
await self._callbacks.on_connection(websocket)
# Handle incoming messages
async for message in websocket:
frame = self._params.serializer.deserialize(message)
await self._internal_push_frame(frame)
logger.info(f"Client {websocket.remote_address} disconnected")
async def _start(self):
loop = self.get_event_loop()
self._server_task = loop.create_task(self._server_task_handler())
async def _stop(self):
self._stop_server_event.set()
self._push_frame_task.cancel()
await self._server_task
#
# Push frames task
#
def _create_push_task(self):
loop = self.get_event_loop()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()
async def _internal_push_frame(
self,
frame: Frame | None,
direction: FrameDirection | None = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))
async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break
class WebsocketServerOutputTransport(BaseOutputTransport):
def __init__(self, params: WebsocketServerParams):
super().__init__(params)
self._params = params
self._websocket: websockets.WebSocketServerProtocol | None = None
self._audio_buffer = bytes()
async def set_client_connection(self, websocket: websockets.WebSocketServerProtocol):
if self._websocket:
await self._websocket.close()
logger.warning("Only one client allowed, using new connection")
self._websocket = websocket
async def write_raw_audio_frames(self, frames: bytes):
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
audio=self._audio_buffer[:self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels
)
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(),
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
frame = wav_frame
proto = self._params.serializer.serialize(frame)
await self._websocket.send(proto)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
class WebsocketServerTransport(BaseTransport):
def __init__(
self,
host: str = "localhost",
port: int = 8765,
params: WebsocketServerParams = WebsocketServerParams(),
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(loop)
self._host = host
self._port = port
self._params = params
self._callbacks = WebsocketServerCallbacks(
on_connection=self._on_connection
)
self._input: WebsocketServerInputTransport | None = None
self._output: WebsocketServerOutputTransport | None = None
self._websocket: websockets.WebSocketServerProtocol | None = None
# Register supported handlers. The user will only be able to register
# these handlers.
self._register_event_handler("on_client_connected")
def input(self) -> FrameProcessor:
if not self._input:
self._input = WebsocketServerInputTransport(
self._host, self._port, self._params, self._callbacks)
return self._input
def output(self) -> FrameProcessor:
if not self._output:
self._output = WebsocketServerOutputTransport(self._params)
return self._output
async def _on_connection(self, websocket):
if self._output:
await self._output.set_client_connection(websocket)
await self._call_event_handler("on_client_connected", websocket)
else:
logger.error("A WebsocketServerTransport output is missing in the pipeline")

View File

@@ -6,15 +6,12 @@
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
import inspect
import queue
import time
import types
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Mapping
from concurrent.futures import ThreadPoolExecutor
from daily import (
CallClient,
@@ -139,7 +136,8 @@ class DailyTransportClient(EventHandler):
token: str | None,
bot_name: str,
params: DailyParams,
callbacks: DailyCallbacks):
callbacks: DailyCallbacks,
loop: asyncio.AbstractEventLoop):
super().__init__()
if not self._daily_initialized:
@@ -151,6 +149,7 @@ class DailyTransportClient(EventHandler):
self._bot_name: str = bot_name
self._params: DailyParams = params
self._callbacks = callbacks
self._loop = loop
self._participant_id: str = ""
self._video_renderers = {}
@@ -183,9 +182,6 @@ class DailyTransportClient(EventHandler):
def participant_id(self) -> str:
return self._participant_id
def set_callbacks(self, callbacks: DailyCallbacks):
self._callbacks = callbacks
def send_message(self, frame: DailyTransportMessageFrame):
self._client.send_app_message(frame.message, frame.participant_id)
@@ -212,8 +208,7 @@ class DailyTransportClient(EventHandler):
self._joining = True
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._join)
await self._loop.run_in_executor(self._executor, self._join)
def _join(self):
logger.info(f"Joining {self._room_url}")
@@ -304,8 +299,7 @@ class DailyTransportClient(EventHandler):
self._joined = False
self._leaving = True
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._leave)
await self._loop.run_in_executor(self._executor, self._leave)
def _leave(self):
logger.info(f"Leaving {self._room_url}")
@@ -335,8 +329,7 @@ class DailyTransportClient(EventHandler):
self._callbacks.on_error(error_msg)
async def cleanup(self):
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._cleanup)
await self._loop.run_in_executor(self._executor, self._cleanup)
def _cleanup(self):
if self._client:
@@ -468,8 +461,10 @@ class DailyInputTransport(BaseInputTransport):
self._client = client
self._executor = ThreadPoolExecutor(max_workers=5)
self._video_renderers = {}
self._camera_in_queue = queue.Queue()
self._camera_in_queue = asyncio.Queue()
self._vad_analyzer = params.vad_analyzer
if params.vad_enabled and not params.vad_analyzer:
@@ -478,39 +473,34 @@ class DailyInputTransport(BaseInputTransport):
num_channels=self._params.audio_in_channels)
async def start(self, frame: StartFrame):
if self._running:
return
# Join the room.
await self._client.join()
# This will set _running=True
# Create camera in task
self._camera_in_task = self.get_event_loop().create_task(self._camera_in_task_handler())
await super().start(frame)
# Create camera in thread (runs if _running is true).
loop = asyncio.get_running_loop()
self._camera_in_thread = loop.run_in_executor(
self._in_executor, self._camera_in_thread_handler)
async def stop(self):
if not self._running:
return
# Leave the room.
await self._client.leave()
# This will set _running=False
# The task will stop.
self._camera_in_task.cancel()
await super().stop()
# The thread will stop.
await self._camera_in_thread
async def cleanup(self):
await super().cleanup()
await self._client.cleanup()
await super().cleanup()
def vad_analyze(self, audio_frames: bytes) -> VADState:
async def vad_analyze(self, audio_frames: bytes) -> VADState:
state = VADState.QUIET
if self._vad_analyzer:
state = self._vad_analyzer.analyze_audio(audio_frames)
state = await self._vad_analyzer.analyze_audio(audio_frames)
return state
def read_raw_audio_frames(self, frame_count: int) -> bytes:
return self._client.read_raw_audio_frames(frame_count)
async def read_raw_audio_frames(self, frame_count: int) -> bytes:
def read_audio(frame_count: int) -> bytes:
return self._client.read_raw_audio_frames(frame_count)
return await self.get_event_loop().run_in_executor(self._executor, read_audio, frame_count)
#
# FrameProcessor
@@ -585,20 +575,19 @@ class DailyInputTransport(BaseInputTransport):
image=buffer,
size=size,
format=format)
self._camera_in_queue.put(frame)
asyncio.run_coroutine_threadsafe(
self._camera_in_queue.put(frame), self.get_event_loop())
self._video_renderers[participant_id]["timestamp"] = curr_time
def _camera_in_thread_handler(self):
while self._running:
async def _camera_in_task_handler(self):
while True:
try:
frame = self._camera_in_queue.get(timeout=1)
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
frame = await self._camera_in_queue.get()
await self._internal_push_frame(frame)
self._camera_in_queue.task_done()
except queue.Empty:
pass
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"Error capturing video: {e}")
@@ -610,39 +599,52 @@ class DailyOutputTransport(BaseOutputTransport):
self._client = client
self._executor = ThreadPoolExecutor(max_workers=5)
async def start(self, frame: StartFrame):
if self._running:
return
# This will set _running=True
await super().start(frame)
# Join the room.
await self._client.join()
await super().start(frame)
async def stop(self):
if not self._running:
return
# This will set _running=False
await super().stop()
# Leave the room.
await self._client.leave()
await super().stop()
async def cleanup(self):
await super().cleanup()
await self._client.cleanup()
await super().cleanup()
def send_message(self, frame: DailyTransportMessageFrame):
self._client.send_message(frame)
async def send_message(self, frame: DailyTransportMessageFrame):
def send_message(frame: DailyTransportMessageFrame):
self._client.send_message(frame)
def write_raw_audio_frames(self, frames: bytes):
self._client.write_raw_audio_frames(frames)
await self.get_event_loop().run_in_executor(self._executor, send_message, frame)
def write_frame_to_camera(self, frame: ImageRawFrame):
self._client.write_frame_to_camera(frame)
async def write_raw_audio_frames(self, frames: bytes):
def write_audio(frames: bytes):
self._client.write_raw_audio_frames(frames)
await self.get_event_loop().run_in_executor(self._executor, write_audio, frames)
async def write_frame_to_camera(self, frame: ImageRawFrame):
def write_image(frame: ImageRawFrame):
self._client.write_frame_to_camera(frame)
await self.get_event_loop().run_in_executor(self._executor, write_image, frame)
class DailyTransport(BaseTransport):
def __init__(self, room_url: str, token: str | None, bot_name: str, params: DailyParams):
def __init__(
self,
room_url: str,
token: str | None,
bot_name: str,
params: DailyParams,
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(loop)
callbacks = DailyCallbacks(
on_joined=self._on_joined,
on_left=self._on_left,
@@ -660,12 +662,10 @@ class DailyTransport(BaseTransport):
)
self._params = params
self._client = DailyTransportClient(room_url, token, bot_name, params, callbacks)
self._client = DailyTransportClient(
room_url, token, bot_name, params, callbacks, self._loop)
self._input: DailyInputTransport | None = None
self._output: DailyOutputTransport | None = None
self._loop = asyncio.get_running_loop()
self._event_handlers: dict = {}
# Register supported handlers. The user will only be able to register
# these handlers.
@@ -741,10 +741,10 @@ class DailyTransport(BaseTransport):
participant_id, framerate, video_source, color_format)
def _on_joined(self, participant):
self.on_joined(participant)
self._call_async_event_handler("on_joined", participant)
def _on_left(self):
self.on_left()
self._call_async_event_handler("on_left")
def _on_error(self, error):
# TODO(aleix): Report error to input/output transports. The one managing
@@ -754,10 +754,10 @@ class DailyTransport(BaseTransport):
def _on_app_message(self, message: Any, sender: str):
if self._input:
self._input.push_app_message(message, sender)
self.on_app_message(message, sender)
self._call_async_event_handler("on_app_message", message, sender)
def _on_call_state_updated(self, state: str):
self.on_call_state_updated(state)
self._call_async_event_handler("on_call_state_updated", state)
async def _handle_dialin_ready(self, sip_endpoint: str):
if not self._params.dialin_settings:
@@ -793,28 +793,28 @@ class DailyTransport(BaseTransport):
def _on_dialin_ready(self, sip_endpoint):
if self._params.dialin_settings:
asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop)
self.on_dialin_ready(sip_endpoint)
self._call_async_event_handler("on_dialin_ready", sip_endpoint)
def _on_dialout_connected(self, data):
self.on_dialout_connected(data)
self._call_async_event_handler("on_dialout_connected", data)
def _on_dialout_stopped(self, data):
self.on_dialout_stopped(data)
self._call_async_event_handler("on_dialout_stopped", data)
def _on_dialout_error(self, data):
self.on_dialout_error(data)
self._call_async_event_handler("on_dialout_error", data)
def _on_dialout_warning(self, data):
self.on_dialout_warning(data)
self._call_async_event_handler("on_dialout_warning", data)
def _on_participant_joined(self, participant):
self.on_participant_joined(participant)
self._call_async_event_handler("on_participant_joined", participant)
def _on_participant_left(self, participant, reason):
self.on_participant_left(participant, reason)
self._call_async_event_handler("on_participant_left", participant, reason)
def _on_first_participant_joined(self, participant):
self.on_first_participant_joined(participant)
self._call_async_event_handler("on_first_participant_joined", participant)
def _on_transcription_message(self, participant_id, message):
text = message["text"]
@@ -829,84 +829,7 @@ class DailyTransport(BaseTransport):
if self._input:
self._input.push_transcription_frame(frame)
#
# Decorators (event handlers)
#
def on_joined(self, participant):
pass
def on_left(self):
pass
def on_app_message(self, message, sender):
pass
def on_call_state_updated(self, state):
pass
def on_dialin_ready(self, sip_endpoint):
pass
def on_dialout_connected(self, data):
pass
def on_dialout_stopped(self, data):
pass
def on_dialout_error(self, data):
pass
def on_dialout_warning(self, data):
pass
def on_first_participant_joined(self, participant):
pass
def on_participant_joined(self, participant):
pass
def on_participant_left(self, participant, reason):
pass
def event_handler(self, event_name: str):
def decorator(handler):
self._add_event_handler(event_name, handler)
return handler
return decorator
def _register_event_handler(self, event_name: str):
methods = inspect.getmembers(self, predicate=inspect.ismethod)
if event_name not in [method[0] for method in methods]:
raise Exception(f"Event handler {event_name} not found")
self._event_handlers[event_name] = [getattr(self, event_name)]
patch_method = types.MethodType(partial(self._patch_method, event_name), self)
setattr(self, event_name, patch_method)
def _add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(types.MethodType(handler, self))
def _patch_method(self, event_name, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
# Beware, if handler() calls another event handler it
# will deadlock. You shouldn't do that anyways.
future = asyncio.run_coroutine_threadsafe(
handler(*args[1:], **kwargs), self._loop)
# wait for the coroutine to finish. This will also
# raise any exceptions raised by the coroutine.
future.result()
else:
handler(*args[1:], **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
raise e
# def start_recording(self):
# self.client.start_recording()
def _call_async_event_handler(self, event_name: str, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(
self._call_event_handler(event_name, *args, **kwargs), self._loop)
future.result()

View File

@@ -46,7 +46,7 @@ class SileroVADAnalyzer(VADAnalyzer):
def num_frames_required(self) -> int:
return int(self.sample_rate / 100) * 4 # 40ms
def voice_confidence(self, buffer) -> float:
async def voice_confidence(self, buffer) -> float:
try:
audio_int16 = np.frombuffer(buffer, np.int16)
# Divide by 32768 because we have signed 16-bit data.
@@ -88,7 +88,7 @@ class SileroVAD(FrameProcessor):
async def _analyze_audio(self, frame: AudioRawFrame):
# Check VAD and push event if necessary. We just care about changes
# from QUIET to SPEAKING and vice versa.
new_vad_state = self._vad_analyzer.analyze_audio(frame.audio)
new_vad_state = await self._vad_analyzer.analyze_audio(frame.audio)
if new_vad_state != self._processor_vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
new_frame = None

View File

@@ -58,14 +58,14 @@ class VADAnalyzer:
pass
@abstractmethod
def voice_confidence(self, buffer) -> float:
async def voice_confidence(self, buffer) -> float:
pass
def _get_smoothed_volume(self, audio: bytes) -> float:
volume = calculate_audio_volume(audio, self._sample_rate)
return exp_smoothing(volume, self._prev_volume, self._smoothing_factor)
def analyze_audio(self, buffer) -> VADState:
async def analyze_audio(self, buffer) -> VADState:
self._vad_buffer += buffer
num_required_bytes = self._vad_frames_num_bytes
@@ -75,7 +75,7 @@ class VADAnalyzer:
audio_frames = self._vad_buffer[:num_required_bytes]
self._vad_buffer = self._vad_buffer[num_required_bytes:]
confidence = self.voice_confidence(audio_frames)
confidence = await self.voice_confidence(audio_frames)
volume = self._get_smoothed_volume(audio_frames)
self._prev_volume = volume