From 09a3c2a82d9d4f46d5b0cb6f6edba77d67bf101c Mon Sep 17 00:00:00 2001 From: Kwindla Hultman Kramer Date: Fri, 4 Oct 2024 15:01:57 -0700 Subject: [PATCH] major functionality working (not configurable, occasional timing bugs maybe) --- examples/foundational/14-function-calling.py | 14 ++++-- .../foundational/19-openai-realtime-beta.py | 3 +- src/pipecat/services/openai.py | 2 +- src/pipecat/services/openai_realtime_beta.py | 43 +++++++++++++------ 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/examples/foundational/14-function-calling.py b/examples/foundational/14-function-calling.py index 35a02743b..e1432b6ca 100644 --- a/examples/foundational/14-function-calling.py +++ b/examples/foundational/14-function-calling.py @@ -11,7 +11,7 @@ import sys from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.openai import OpenAILLMContext, OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -115,13 +115,21 @@ async def main(): ] ) - task = PipelineTask(pipeline) + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + report_only_initial_ttfb=True, + ), + ) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await tts.say("Hi! Ask me about the weather in San Francisco.") + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/examples/foundational/19-openai-realtime-beta.py b/examples/foundational/19-openai-realtime-beta.py index ae2589995..47efb5887 100644 --- a/examples/foundational/19-openai-realtime-beta.py +++ b/examples/foundational/19-openai-realtime-beta.py @@ -14,7 +14,6 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, - OpenAILLMContextFrame, ) from pipecat.services.openai_realtime_beta import ( OpenAILLMServiceRealtimeBeta, @@ -140,7 +139,7 @@ Remember, your responses should be short. Just one or two sentences, usually. async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) # Kick off the conversation. - await task.queue_frames([OpenAILLMContextFrame(context=context)]) + await task.queue_frames([context_aggregator.user().get_context_frame()]) runner = PipelineRunner() diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 22dba36ac..e0e304d30 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -470,7 +470,7 @@ class OpenAIUserContextAggregator(LLMUserContextAggregator): if frame.user_id in self._context._user_image_request_context: del self._context._user_image_request_context[frame.user_id] elif isinstance(frame, UserImageRawFrame): - # Push a new AnthropicImageMessageFrame with the text context we cached + # Push a new OpenAIImageMessageFrame with the text context we cached # downstream to be handled by our assistant context aggregator. This is # necessary so that we add the message to the context in the right order. text = self._context._user_image_request_context.get(frame.user_id) or "" diff --git a/src/pipecat/services/openai_realtime_beta.py b/src/pipecat/services/openai_realtime_beta.py index 6eee43458..819354ac8 100644 --- a/src/pipecat/services/openai_realtime_beta.py +++ b/src/pipecat/services/openai_realtime_beta.py @@ -21,6 +21,8 @@ from pipecat.frames.frames import ( TextFrame, TranscriptionFrame, TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, ) from pipecat.metrics.metrics import LLMTokenUsage from pipecat.processors.frame_processor import FrameDirection @@ -84,9 +86,6 @@ class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator): class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator): async def _push_aggregation(self): await super()._push_aggregation() - logger.debug( - f"!!! AFTER ASSISTANT PUSH AGGREGATION {self._context.get_messages_for_logging()}" - ) class OpenAIInputTranscription(BaseModel): @@ -147,6 +146,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService): self._session_properties = session_properties self._context = None + self._bot_speaking = False + + def can_generate_metrics(self) -> bool: + return True + async def start(self, frame: StartFrame): await super().start(frame) await self._connect() @@ -161,9 +165,8 @@ class OpenAILLMServiceRealtimeBeta(LLMService): async def _ws_send(self, realtime_message): try: - if realtime_message.get("type") != "input_audio_buffer.append": - logger.debug(f"!!! Sending message to websocket: {realtime_message}") - + # if realtime_message.get("type") != "input_audio_buffer.append": + # logger.debug(f"!!! Sending message to websocket: {realtime_message}") await self._websocket.send(json.dumps(realtime_message)) except Exception as e: logger.error(f"Error sending message to websocket: {e}") @@ -228,7 +231,8 @@ class OpenAILLMServiceRealtimeBeta(LLMService): pass elif msg["type"] == "input_audio_buffer.speech_stopped": # user stopped speaking - pass + await self.start_processing_metrics() + await self.start_ttfb_metrics() elif msg["type"] == "conversation.item.created": # for input, this will get sent from the server whether the # conversation item is created by audio transcription or by @@ -237,7 +241,12 @@ class OpenAILLMServiceRealtimeBeta(LLMService): # logger.debug(f"Received {msg}") pass elif msg["type"] == "response.created": - # could use for processing metrics + # todo: 1. figure out TTS started/stopped frame semantics better + # 2. do not push these frames in text-only mode + logger.debug(f"Received response created: {msg}") + if not self._bot_speaking: + self._bot_speaking = True + await self.push_frame(TTSStartedFrame()) pass elif msg["type"] == "conversation.item.input_audio_transcription.completed": # or here maybe (possible send upstream to user context aggregator) @@ -252,8 +261,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService): pass elif msg["type"] == "response.audio_transcript.delta": # openai playground app uses this, not "text" + if msg["delta"]: + await self.push_frame(TextFrame(msg["delta"])) pass elif msg["type"] == "response.audio.delta": + await self.stop_ttfb_metrics() frame = TTSAudioRawFrame( audio=base64.b64decode(msg["delta"]), sample_rate=24000, @@ -261,7 +273,9 @@ class OpenAILLMServiceRealtimeBeta(LLMService): ) await self.push_frame(frame) elif msg["type"] == "response.audio.done": - # bot stopped speaking - or do that at the end of the response? + if self._bot_speaking: + self._bot_speaking = False + await self.push_frame(TTSStoppedFrame()) pass elif msg["type"] == "response.audio_transcript.done": # probably ignore for now @@ -275,11 +289,11 @@ class OpenAILLMServiceRealtimeBeta(LLMService): for item in item["content"]: # output text if item["type"] == "audio" and item["transcript"] is not None: - logger.debug(f"!!! >{item['transcript']}") - await self.push_frame(TextFrame(item["transcript"])) + # could send full transcript here instead of streaming chunks + # logger.debug(f"!!! >{item['transcript']}") + pass elif msg["type"] == "response.done": # logger.debug(f"Received response done: {msg}") - await self.stop_processing_metrics() # usage metrics # example. # response.usage.total_tokens:592 @@ -290,13 +304,14 @@ class OpenAILLMServiceRealtimeBeta(LLMService): # response.usage.input_token_details.audio_tokens:115 # response.usage.output_token_details.text_tokens:32 # response.usage.output_token_details.audio_tokens:135 - logger.debug("!!! Response done PPUSHING METRICS") tokens = LLMTokenUsage( prompt_tokens=msg["response"]["usage"]["input_tokens"], completion_tokens=msg["response"]["usage"]["output_tokens"], total_tokens=msg["response"]["usage"]["total_tokens"], ) await self.start_llm_usage_metrics(tokens) + # question for mrkb: don't seem to be getting processing time on the console except the first inference + await self.stop_processing_metrics() # function calls items = msg["response"]["output"] function_calls = [item for item in items if item.get("type") == "function_call"] @@ -398,9 +413,9 @@ class OpenAILLMServiceRealtimeBeta(LLMService): ) async def _handle_interruption(self, frame): - logger.debug("!!! Handling interruption") await self.stop_all_metrics() await self.push_frame(LLMFullResponseEndFrame()) + await self.push_frame(TTSStoppedFrame()) # todo: do this but only when there's a response in progress? # await self._ws_send( # {