From 4d81a2ebfe13003e11529bfc4c8dc7ab59fc62ee Mon Sep 17 00:00:00 2001 From: Adrian Cowham Date: Thu, 3 Oct 2024 14:10:03 -0700 Subject: [PATCH] nuked the code that marks user audio in favor for InputAudioRawFrame. also moving to stereo instead of mono with the human and bot on their own channel. --- examples/canonical-metrics/bot.py | 5 +- examples/chatbot-audio-recording/bot.py | 6 +- src/pipecat/frames/frames.py | 9 -- .../audio/audio_buffer_processor.py | 103 ++++++++++++------ .../processors/user_marker_processor.py | 19 ---- src/pipecat/services/canonical.py | 13 +-- 6 files changed, 72 insertions(+), 83 deletions(-) delete mode 100644 src/pipecat/processors/user_marker_processor.py diff --git a/examples/canonical-metrics/bot.py b/examples/canonical-metrics/bot.py index 111c1666e..626af5f62 100644 --- a/examples/canonical-metrics/bot.py +++ b/examples/canonical-metrics/bot.py @@ -22,7 +22,6 @@ from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) from pipecat.processors.audio.audio_buffer_processor import \ AudioBufferProcessor -from pipecat.processors.user_marker_processor import UserMarkerProcessor from pipecat.services.canonical import CanonicalMetricsService from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService @@ -115,14 +114,12 @@ async def main(): assistant="pipecat-chatbot", assistant_speaks_first=True, ) - usermarker = UserMarkerProcessor() pipeline = Pipeline([ transport.input(), # microphone - usermarker, user_response, llm, tts, - audio_buffer_processor, # captures audio into a buffer + audio_buffer_processor, # captures audio into a buffer canonical, # uploads audio buffer to Canonical AI for metrics transport.output(), assistant_response, diff --git a/examples/chatbot-audio-recording/bot.py b/examples/chatbot-audio-recording/bot.py index b6da9b54d..b1955ddba 100644 --- a/examples/chatbot-audio-recording/bot.py +++ b/examples/chatbot-audio-recording/bot.py @@ -19,8 +19,8 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_response import ( LLMAssistantResponseAggregator, LLMUserResponseAggregator) -from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor -from pipecat.processors.user_marker_processor import UserMarkerProcessor +from pipecat.processors.audio.audio_buffer_processor import \ + AudioBufferProcessor from pipecat.services.elevenlabs import ElevenLabsTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -98,10 +98,8 @@ async def main(): assistant_response = LLMAssistantResponseAggregator() audiobuffer = AudioBufferProcessor() - usermarker = UserMarkerProcessor() pipeline = Pipeline([ transport.input(), # microphone - usermarker, # used to mark the user's audio in the pipeline user_response, llm, tts, diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index c39026c31..e2ef78df5 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -285,15 +285,6 @@ class AppFrame(Frame): pass -@dataclass -class UserAudioFrame(AudioRawFrame): - """ - Indicates user audio in the pipeline. - """ - - def __init__(self, frame: AudioRawFrame): - super().__init__(frame.audio, frame.sample_rate, frame.num_channels) - # # System frames # diff --git a/src/pipecat/processors/audio/audio_buffer_processor.py b/src/pipecat/processors/audio/audio_buffer_processor.py index 27381dc82..8474466dc 100644 --- a/src/pipecat/processors/audio/audio_buffer_processor.py +++ b/src/pipecat/processors/audio/audio_buffer_processor.py @@ -1,6 +1,13 @@ -from pipecat.frames.frames import (AudioRawFrame, BotStartedSpeakingFrame, +import wave +from io import BytesIO + +from pipecat.frames.frames import (AudioRawFrame, BotInterruptionFrame, + BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame, - UserAudioFrame, UserStartedSpeakingFrame, + InputAudioRawFrame, OutputAudioRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, + UserStartedSpeakingFrame, UserStoppedSpeakingFrame) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor @@ -22,61 +29,85 @@ class AudioBufferProcessor(FrameProcessor): populated when the first audio frame is processed. """ super().__init__() - self._audio_buffer = bytearray() + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() self._num_channels = None self._sample_rate = None self._assistant_audio = False self._user_audio = False + def _buffer_has_audio(self, buffer: bytearray): + return ( + buffer is not None and + len(buffer) > 0 + ) + def _has_audio(self): return ( - self._audio_buffer is not None and - len(self._audio_buffer) > 0 and - self._num_channels is not None and + self._buffer_has_audio(self._user_audio_buffer) and + self._buffer_has_audio(self._assistant_audio_buffer) and self._sample_rate is not None ) def _reset_audio_buffer(self): - self._audio_buffer = bytearray() + self._user_audio_buffer = bytearray() + self._assistant_audio_buffer = bytearray() + + def _merge_audio_buffers(self): + with BytesIO() as buffer: + with wave.open(buffer, 'wb') as wf: + wf.setnchannels(2) + wf.setsampwidth(self._sample_rate // 8000) + wf.setframerate(self._sample_rate) + # Interleave the two audio streams + max_length = max(len(self._user_audio_buffer), + len(self._assistant_audio_buffer)) + interleaved = bytearray(max_length * 2) + + for i in range(0, max_length, 2): + if i < len(self._user_audio_buffer): + interleaved[i * 2] = self._user_audio_buffer[i] + interleaved[i * 2 + 1] = self._user_audio_buffer[i + 1] + else: + interleaved[i * 2] = 0 + interleaved[i * 2 + 1] = 0 + + if i < len(self._assistant_audio_buffer): + interleaved[i * 2 + 2] = self._assistant_audio_buffer[i] + interleaved[i * 2 + 3] = self._assistant_audio_buffer[i + 1] + else: + interleaved[i * 2 + 2] = 0 + interleaved[i * 2 + 3] = 0 + + wf.writeframes(interleaved) + return buffer.getvalue() async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): - if self._num_channels is None: - self._num_channels = frame.num_channels - if self._sample_rate is None: - self._sample_rate = frame.sample_rate - - if isinstance(frame, UserStoppedSpeakingFrame): - self._user_audio = False + if (isinstance(frame, AudioRawFrame) and self._sample_rate is None): + self._sample_rate = frame.sample_rate if isinstance(frame, BotStartedSpeakingFrame): self._assistant_audio = True - self._user_audio = False # do not capture user audio if assistant is speaking - if isinstance(frame, BotStoppedSpeakingFrame): + # this handles the case where the user starts speaking and interrupts the bot + if (isinstance(frame, BotStoppedSpeakingFrame) or + isinstance(frame, UserStartedSpeakingFrame)): self._assistant_audio = False - # Capture user audio if assistant is not speaking, even if it's silence, the point - # here is to capture so that the conversation is as close to reality as possible. - # This is important for evaluation and metrics capture. - self._user_audio = True - # only include audio from the user if the user is speaking, this is because audio from the user's - # mic is always coming in. if we include all the user's audio there will be a long latency before - # the user starts speaking because all of the user's silence during the assistant's speech will have been - # added to the buffer. - # - # and include all audio from the assistant - if isinstance(frame, UserAudioFrame) and self._user_audio: - self._audio_buffer.extend(frame.audio) + # include all audio from the user + if isinstance(frame, InputAudioRawFrame): + self._user_audio_buffer.extend(frame.audio) + # Sync the assistant's buffer to the user's buffer by adding silence if needed + if len(self._user_audio_buffer) > len(self._assistant_audio_buffer): + silence_length = len(self._user_audio_buffer) - len(self._assistant_audio_buffer) + silence = b'\x00' * silence_length + self._assistant_audio_buffer.extend(silence) - # include all audio from the assistant - if ( - isinstance(frame, AudioRawFrame) - and not isinstance(frame, UserAudioFrame) - ): - self._audio_buffer.extend(frame.audio) + # if the assistant is speaking, include all audio from the assistant, + if (isinstance(frame, OutputAudioRawFrame)) and self._assistant_audio: + self._assistant_audio_buffer.extend(frame.audio) # do not push the user's audio frame, doing so will result in echo - if not isinstance(frame, UserAudioFrame): + if not isinstance(frame, InputAudioRawFrame): await self.push_frame(frame, direction) diff --git a/src/pipecat/processors/user_marker_processor.py b/src/pipecat/processors/user_marker_processor.py deleted file mode 100644 index cadbfffeb..000000000 --- a/src/pipecat/processors/user_marker_processor.py +++ /dev/null @@ -1,19 +0,0 @@ -from pipecat.frames.frames import AudioRawFrame, Frame, UserAudioFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class UserMarkerProcessor(FrameProcessor): - """ - This class extends FrameProcessor, used to mark the user's audio in the pipeline. - This FrameProcessor must be inserted after transport.input() so that the only - AudioRaw it receives are from the user. - """ - - def __init__(self): - super().__init__() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - if isinstance(frame, AudioRawFrame): - frame = UserAudioFrame(frame) - await self.push_frame(frame, direction) diff --git a/src/pipecat/services/canonical.py b/src/pipecat/services/canonical.py index a973d5481..00990ad16 100644 --- a/src/pipecat/services/canonical.py +++ b/src/pipecat/services/canonical.py @@ -1,8 +1,6 @@ import os import uuid -import wave from datetime import datetime -from io import BytesIO from typing import Dict, List, Tuple import aiohttp @@ -92,19 +90,13 @@ class CanonicalMetricsService(AIService): if pipeline._has_audio(): os.makedirs(self._output_dir, exist_ok=True) filename = self._get_output_filename() - with BytesIO() as buffer: - with wave.open(buffer, 'wb') as wf: - wf.setnchannels(pipeline._num_channels) - wf.setsampwidth(pipeline._sample_rate // 8000) - wf.setframerate(pipeline._sample_rate) - wf.writeframes(pipeline._audio_buffer) - wave_data = buffer.getvalue() + wave_data = pipeline._merge_audio_buffers() async with aiofiles.open(filename, 'wb') as file: await file.write(wave_data) try: - await self._multipart_upload(filename) + # await self._multipart_upload(filename) pipeline._reset_audio_buffer() # await aiofiles.os.remove(filename) except FileNotFoundError: @@ -209,4 +201,3 @@ class CanonicalMetricsService(AIService): if not response.ok: logger.error(f"Failed to complete upload: {await response.text()}") return -