examples: websocket-server updates

This commit is contained in:
Aleix Conchillo Flaqué
2024-05-29 16:21:23 -07:00
parent e31e87aabd
commit 5f45a9d90f
2 changed files with 207 additions and 123 deletions

View File

@@ -1,134 +1,197 @@
<!DOCTYPE html>
<html lang="en">
<head>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdn.jsdelivr.net/npm/protobufjs@7.X.X/dist/protobuf.min.js"></script>
<title>WebSocket Audio Stream</title>
</head>
<title>Pipecat WebSocket Client Example</title>
</head>
<body>
<h1>WebSocket Audio Stream</h1>
<body>
<h1>Pipecat WebSocket Client Example</h1>
<h3><div id="progressText">Loading, wait...</div></h2>
<button id="startAudioBtn">Start Audio</button>
<button id="stopAudioBtn">Stop Audio</button>
<script>
const SAMPLE_RATE = 16000;
const BUFFER_SIZE = 8192;
const MIN_AUDIO_SIZE = 6400;
const SAMPLE_RATE = 16000;
const NUM_CHANNELS = 1;
let audioContext;
let microphoneStream;
let scriptProcessor;
let source;
let frame;
let audioChunks = [];
let isPlaying = false;
let ws;
// The protobuf type. We will load it later.
let Frame = null;
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) throw err;
frame = root.lookupType("pipecat.Frame");
});
// The websocket connection.
let ws = null;
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
// The audio context
let audioContext = null;
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => console.log("WebSocket connection closed.", event.code, event.reason));
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
// The audio context media stream source
let source = null;
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
enqueueAudioFromProto(arrayBuffer);
}
// The microphone stream from getUserMedia. SHould be sampled to the
// proper sample rate.
let microphoneStream = null;
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) return false;
// Script processor to get data from microphone.
let scriptProcessor = null;
const frameCount = parsedFrame.audio.data.length / 2;
const audioOutBuffer = audioContext.createBuffer(1, frameCount, SAMPLE_RATE);
const nowBuffering = audioOutBuffer.getChannelData(0);
const view = new Int16Array(parsedFrame.audio.data.buffer);
// AudioContext play time.
let playTime = 0;
for (let i = 0; i < frameCount; i++) {
const word = view[i];
nowBuffering[i] = ((word + 32768) % 65536 - 32768) / 32768.0;
}
// Whether we should be playing audio.
let isPlaying = false;
audioChunks.push(audioOutBuffer);
if (!isPlaying) playNextChunk();
}
let startBtn = document.getElementById('startAudioBtn');
let stopBtn = document.getElementById('stopAudioBtn');
function playNextChunk() {
if (audioChunks.length === 0) {
isPlaying = false;
return;
}
const proto = protobuf.load("frames.proto", (err, root) => {
if (err) {
throw err;
}
Frame = root.lookupType("pipecat.Frame");
const progressText = document.getElementById("progressText");
progressText.textContent = "We are ready! Make sure to run the server and then click `Start Audio`.";
isPlaying = true;
const audioOutBuffer = audioChunks.shift();
const source = audioContext.createBufferSource();
source.buffer = audioOutBuffer;
source.connect(audioContext.destination);
source.onended = playNextChunk;
source.start();
}
startBtn.disabled = false;
stopBtn.disabled = true;
});
function startAudio() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
function initWebSocket() {
ws = new WebSocket('ws://localhost:8765');
navigator.mediaDevices.getUserMedia({ audio: true })
.then((stream) => {
microphoneStream = stream;
audioContext = new (window.AudioContext || window.webkitAudioContext)();
scriptProcessor = audioContext.createScriptProcessor(BUFFER_SIZE, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
ws.addEventListener('open', () => console.log('WebSocket connection established.'));
ws.addEventListener('message', handleWebSocketMessage);
ws.addEventListener('close', (event) => {
console.log("WebSocket connection closed.", event.code, event.reason);
stopAudio(false);
});
ws.addEventListener('error', (event) => console.error('WebSocket error:', event));
}
const audioBuffer = [];
const skipRatio = Math.floor(audioContext.sampleRate / (SAMPLE_RATE * 2));
async function handleWebSocketMessage(event) {
const arrayBuffer = await event.data.arrayBuffer();
if (isPlaying) {
enqueueAudioFromProto(arrayBuffer);
}
}
scriptProcessor.onaudioprocess = (event) => {
const rawLeftChannelData = event.inputBuffer.getChannelData(0);
for (let i = 0; i < rawLeftChannelData.length; i += skipRatio) {
const normalized = ((rawLeftChannelData[i] * 32768.0) + 32768) % 65536 - 32768;
const swappedBytes = ((normalized & 0xff) << 8) | ((normalized >> 8) & 0xff);
audioBuffer.push(swappedBytes);
}
function enqueueAudioFromProto(arrayBuffer) {
const parsedFrame = Frame.decode(new Uint8Array(arrayBuffer));
if (!parsedFrame?.audio) {
return false;
}
if (audioBuffer.length >= MIN_AUDIO_SIZE) {
const audioFrame = frame.create({ audio: { audio: audioBuffer.slice(0, MIN_AUDIO_SIZE) } });
const encodedFrame = new Uint8Array(frame.encode(audioFrame).finish());
ws.send(encodedFrame);
audioBuffer.splice(0, MIN_AUDIO_SIZE);
}
};
if (playTime == 0) {
playTime = audioContext.currentTime;
}
initWebSocket();
})
.catch((error) => console.error('Error accessing microphone:', error));
}
// We should be able to use parsedFrame.audio.audio.buffer but for
// some reason that contains all the bytes from the protobuf message.
const audioVector = Array.from(parsedFrame.audio.audio);
const audioArray = new Uint8Array(audioVector);
function stopAudio() {
if (ws) {
ws.close();
scriptProcessor.disconnect();
source.disconnect();
ws = undefined;
}
}
audioContext.decodeAudioData(audioArray.buffer, function(buffer) {
const source = new AudioBufferSourceNode(audioContext);
source.buffer = buffer;
source.start(playTime);
source.connect(audioContext.destination);
playTime = playTime + buffer.duration;
});
}
document.getElementById('startAudioBtn').addEventListener('click', startAudio);
document.getElementById('stopAudioBtn').addEventListener('click', stopAudio);
function convertFloat32ToS16PCM(float32Array) {
let int16Array = new Int16Array(float32Array.length);
for (let i = 0; i < float32Array.length; i++) {
let clampedValue = Math.max(-1, Math.min(1, float32Array[i]));
int16Array[i] = clampedValue < 0 ? clampedValue * 32768 : clampedValue * 32767;
}
return int16Array;
}
function startAudioBtnHandler() {
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
alert('getUserMedia is not supported in your browser.');
return;
}
startBtn.disabled = true;
stopBtn.disabled = false;
audioContext = new (window.AudioContext || window.webkitAudioContext)({
latencyHint: "interactive",
sampleRate: SAMPLE_RATE
});
isPlaying = true;
initWebSocket();
navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: SAMPLE_RATE,
channelCount: NUM_CHANNELS,
autoGainControl: true,
echoCancellation: true,
noiseSuppression: true,
}
}).then((stream) => {
microphoneStream = stream;
// 512 is closest thing to 200ms.
scriptProcessor = audioContext.createScriptProcessor(512, 1, 1);
source = audioContext.createMediaStreamSource(stream);
source.connect(scriptProcessor);
scriptProcessor.connect(audioContext.destination);
scriptProcessor.onaudioprocess = (event) => {
if (!ws) {
return;
}
const audioData = event.inputBuffer.getChannelData(0);
const pcmS16Array = convertFloat32ToS16PCM(audioData);
const pcmByteArray = new Uint8Array(pcmS16Array.buffer);
const frame = Frame.create({
audio: {
audio: Array.from(pcmByteArray),
sampleRate: SAMPLE_RATE,
numChannels: NUM_CHANNELS
}
});
const encodedFrame = new Uint8Array(Frame.encode(frame).finish());
ws.send(encodedFrame);
};
}).catch((error) => console.error('Error accessing microphone:', error));
}
function stopAudio(closeWebsocket) {
isPlaying = false;
startBtn.disabled = false;
stopBtn.disabled = true;
if (ws && closeWebsocket) {
ws.close();
ws = null;
}
if (scriptProcessor) {
scriptProcessor.disconnect();
}
if (source) {
source.disconnect();
}
}
function stopAudioBtnHandler() {
stopAudio(true);
}
startBtn.addEventListener('click', startAudioBtnHandler);
stopBtn.addEventListener('click', stopAudioBtnHandler);
startBtn.disabled = true;
stopBtn.disabled = true;
</script>
</body>
</body>
</html>

View File

@@ -9,32 +9,37 @@ import asyncio
import os
import sys
from loguru import logger
from pipecat.frames.frames import Frame, TextFrame, TranscriptionFrame
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import WhisperSTTService
from pipecat.transports.network.websocket_server import WebsocketServerTransport
from pipecat.transports.network.websocket_server import WebsocketServerParams, WebsocketServerTransport
from pipecat.vad.silero import SileroVAD
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class WhisperTranscriber(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, TranscriptionFrame):
print(f"Transcribed: {frame.text}")
else:
await self.push_frame(frame, direction)
logger.add(sys.stderr, level="TRACE")
async def main():
async with aiohttp.ClientSession() as session:
transport = WebsocketServerTransport()
transport = WebsocketServerTransport(params=WebsocketServerParams(add_wav_header=True))
vad = SileroVAD(audio_passthrough=True)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-turbo-preview")
stt = WhisperSTTService()
tts = ElevenLabsTTSService(
aiohttp_session=session,
@@ -42,19 +47,35 @@ async def main():
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
WhisperSTTService(),
WhisperTranscriber(),
tts,
transport.output(),
transport.input(), # Websocket input from client
vad, # VAD to detect user speech
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
task = PipelineTask(pipeline)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await task.queue_frame(TextFrame("Hello there!"))
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()