nuked the code that marks user audio in favor for InputAudioRawFrame. also moving to stereo instead of mono with the human and bot on their own channel.

This commit is contained in:
Adrian Cowham
2024-10-03 14:10:03 -07:00
parent 2d82702e04
commit 4d81a2ebfe
6 changed files with 72 additions and 83 deletions

View File

@@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.audio.audio_buffer_processor import \
AudioBufferProcessor
from pipecat.processors.user_marker_processor import UserMarkerProcessor
from pipecat.services.canonical import CanonicalMetricsService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
@@ -115,14 +114,12 @@ async def main():
assistant="pipecat-chatbot",
assistant_speaks_first=True,
)
usermarker = UserMarkerProcessor()
pipeline = Pipeline([
transport.input(), # microphone
usermarker,
user_response,
llm,
tts,
audio_buffer_processor, # captures audio into a buffer
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
transport.output(),
assistant_response,

View File

@@ -19,8 +19,8 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.user_marker_processor import UserMarkerProcessor
from pipecat.processors.audio.audio_buffer_processor import \
AudioBufferProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -98,10 +98,8 @@ async def main():
assistant_response = LLMAssistantResponseAggregator()
audiobuffer = AudioBufferProcessor()
usermarker = UserMarkerProcessor()
pipeline = Pipeline([
transport.input(), # microphone
usermarker, # used to mark the user's audio in the pipeline
user_response,
llm,
tts,

View File

@@ -285,15 +285,6 @@ class AppFrame(Frame):
pass
@dataclass
class UserAudioFrame(AudioRawFrame):
"""
Indicates user audio in the pipeline.
"""
def __init__(self, frame: AudioRawFrame):
super().__init__(frame.audio, frame.sample_rate, frame.num_channels)
#
# System frames
#

View File

@@ -1,6 +1,13 @@
from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame,
import wave
from io import BytesIO
from pipecat.frames.frames import (AudioRawFrame, BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame, Frame,
UserAudioFrame, UserStartedSpeakingFrame,
InputAudioRawFrame, OutputAudioRawFrame,
StartInterruptionFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -22,61 +29,85 @@ class AudioBufferProcessor(FrameProcessor):
populated when the first audio frame is processed.
"""
super().__init__()
self._audio_buffer = bytearray()
self._user_audio_buffer = bytearray()
self._assistant_audio_buffer = bytearray()
self._num_channels = None
self._sample_rate = None
self._assistant_audio = False
self._user_audio = False
def _buffer_has_audio(self, buffer: bytearray):
return (
buffer is not None and
len(buffer) > 0
)
def _has_audio(self):
return (
self._audio_buffer is not None and
len(self._audio_buffer) > 0 and
self._num_channels is not None and
self._buffer_has_audio(self._user_audio_buffer) and
self._buffer_has_audio(self._assistant_audio_buffer) and
self._sample_rate is not None
)
def _reset_audio_buffer(self):
self._audio_buffer = bytearray()
self._user_audio_buffer = bytearray()
self._assistant_audio_buffer = bytearray()
def _merge_audio_buffers(self):
with BytesIO() as buffer:
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(2)
wf.setsampwidth(self._sample_rate // 8000)
wf.setframerate(self._sample_rate)
# Interleave the two audio streams
max_length = max(len(self._user_audio_buffer),
len(self._assistant_audio_buffer))
interleaved = bytearray(max_length * 2)
for i in range(0, max_length, 2):
if i < len(self._user_audio_buffer):
interleaved[i * 2] = self._user_audio_buffer[i]
interleaved[i * 2 + 1] = self._user_audio_buffer[i + 1]
else:
interleaved[i * 2] = 0
interleaved[i * 2 + 1] = 0
if i < len(self._assistant_audio_buffer):
interleaved[i * 2 + 2] = self._assistant_audio_buffer[i]
interleaved[i * 2 + 3] = self._assistant_audio_buffer[i + 1]
else:
interleaved[i * 2 + 2] = 0
interleaved[i * 2 + 3] = 0
wf.writeframes(interleaved)
return buffer.getvalue()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
if self._num_channels is None:
self._num_channels = frame.num_channels
if self._sample_rate is None:
self._sample_rate = frame.sample_rate
if isinstance(frame, UserStoppedSpeakingFrame):
self._user_audio = False
if (isinstance(frame, AudioRawFrame) and self._sample_rate is None):
self._sample_rate = frame.sample_rate
if isinstance(frame, BotStartedSpeakingFrame):
self._assistant_audio = True
self._user_audio = False # do not capture user audio if assistant is speaking
if isinstance(frame, BotStoppedSpeakingFrame):
# this handles the case where the user starts speaking and interrupts the bot
if (isinstance(frame, BotStoppedSpeakingFrame) or
isinstance(frame, UserStartedSpeakingFrame)):
self._assistant_audio = False
# Capture user audio if assistant is not speaking, even if it's silence, the point
# here is to capture so that the conversation is as close to reality as possible.
# This is important for evaluation and metrics capture.
self._user_audio = True
# only include audio from the user if the user is speaking, this is because audio from the user's
# mic is always coming in. if we include all the user's audio there will be a long latency before
# the user starts speaking because all of the user's silence during the assistant's speech will have been
# added to the buffer.
#
# and include all audio from the assistant
if isinstance(frame, UserAudioFrame) and self._user_audio:
self._audio_buffer.extend(frame.audio)
# include all audio from the user
if isinstance(frame, InputAudioRawFrame):
self._user_audio_buffer.extend(frame.audio)
# Sync the assistant's buffer to the user's buffer by adding silence if needed
if len(self._user_audio_buffer) > len(self._assistant_audio_buffer):
silence_length = len(self._user_audio_buffer) - len(self._assistant_audio_buffer)
silence = b'\x00' * silence_length
self._assistant_audio_buffer.extend(silence)
# include all audio from the assistant
if (
isinstance(frame, AudioRawFrame)
and not isinstance(frame, UserAudioFrame)
):
self._audio_buffer.extend(frame.audio)
# if the assistant is speaking, include all audio from the assistant,
if (isinstance(frame, OutputAudioRawFrame)) and self._assistant_audio:
self._assistant_audio_buffer.extend(frame.audio)
# do not push the user's audio frame, doing so will result in echo
if not isinstance(frame, UserAudioFrame):
if not isinstance(frame, InputAudioRawFrame):
await self.push_frame(frame, direction)

View File

@@ -1,19 +0,0 @@
from pipecat.frames.frames import AudioRawFrame, Frame, UserAudioFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class UserMarkerProcessor(FrameProcessor):
"""
This class extends FrameProcessor, used to mark the user's audio in the pipeline.
This FrameProcessor must be inserted after transport.input() so that the only
AudioRaw it receives are from the user.
"""
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
frame = UserAudioFrame(frame)
await self.push_frame(frame, direction)

View File

@@ -1,8 +1,6 @@
import os
import uuid
import wave
from datetime import datetime
from io import BytesIO
from typing import Dict, List, Tuple
import aiohttp
@@ -92,19 +90,13 @@ class CanonicalMetricsService(AIService):
if pipeline._has_audio():
os.makedirs(self._output_dir, exist_ok=True)
filename = self._get_output_filename()
with BytesIO() as buffer:
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(pipeline._num_channels)
wf.setsampwidth(pipeline._sample_rate // 8000)
wf.setframerate(pipeline._sample_rate)
wf.writeframes(pipeline._audio_buffer)
wave_data = buffer.getvalue()
wave_data = pipeline._merge_audio_buffers()
async with aiofiles.open(filename, 'wb') as file:
await file.write(wave_data)
try:
await self._multipart_upload(filename)
# await self._multipart_upload(filename)
pipeline._reset_audio_buffer()
# await aiofiles.os.remove(filename)
except FileNotFoundError:
@@ -209,4 +201,3 @@ class CanonicalMetricsService(AIService):
if not response.ok:
logger.error(f"Failed to complete upload: {await response.text()}")
return