From edd44cc18100dfd3ba0419320bba9993ce574ca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 16 Oct 2024 10:19:00 -0700 Subject: [PATCH] services(openai): rename OpenAILLMServiceRealtimeBeta to OpenAIRealtimeBetaLLMService --- CHANGELOG.md | 7 + .../foundational/19-openai-realtime-beta.py | 8 +- .../20b-persistent-context-openai-realtime.py | 4 +- .../services/openai_realtime_beta/__init__.py | 2 +- .../services/openai_realtime_beta/context.py | 208 ++++++++++++++++ .../services/openai_realtime_beta/frames.py | 19 ++ .../{llm_and_context.py => openai.py} | 232 ++---------------- 7 files changed, 255 insertions(+), 225 deletions(-) create mode 100644 src/pipecat/services/openai_realtime_beta/context.py create mode 100644 src/pipecat/services/openai_realtime_beta/frames.py rename src/pipecat/services/openai_realtime_beta/{llm_and_context.py => openai.py} (71%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06f49824d..18bf85ffe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- Renamed `OpenAILLMServiceRealtimeBeta` to `OpenAIRealtimeBetaLLMService` to + match other services. + ## [0.0.45] - 2024-10-16 ### Changed diff --git a/examples/foundational/19-openai-realtime-beta.py b/examples/foundational/19-openai-realtime-beta.py index 51ca773e1..a361987f3 100644 --- a/examples/foundational/19-openai-realtime-beta.py +++ b/examples/foundational/19-openai-realtime-beta.py @@ -17,12 +17,10 @@ from runner import configure from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, -) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.openai_realtime_beta import ( InputAudioTranscription, - OpenAILLMServiceRealtimeBeta, + OpenAIRealtimeBetaLLMService, SessionProperties, TurnDetection, ) @@ -116,7 +114,7 @@ unless specifically asked to elaborate on a topic. Remember, your responses should be short. Just one or two sentences, usually.""", ) - llm = OpenAILLMServiceRealtimeBeta( + llm = OpenAIRealtimeBetaLLMService( api_key=os.getenv("OPENAI_API_KEY"), session_properties=session_properties, start_audio_paused=False, diff --git a/examples/foundational/20b-persistent-context-openai-realtime.py b/examples/foundational/20b-persistent-context-openai-realtime.py index 27a2b7de6..4935fe281 100644 --- a/examples/foundational/20b-persistent-context-openai-realtime.py +++ b/examples/foundational/20b-persistent-context-openai-realtime.py @@ -24,7 +24,7 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.services.openai_realtime_beta import ( InputAudioTranscription, - OpenAILLMServiceRealtimeBeta, + OpenAIRealtimeBetaLLMService, SessionProperties, TurnDetection, ) @@ -211,7 +211,7 @@ unless specifically asked to elaborate on a topic. Remember, your responses should be short. Just one or two sentences, usually.""", ) - llm = OpenAILLMServiceRealtimeBeta( + llm = OpenAIRealtimeBetaLLMService( api_key=os.getenv("OPENAI_API_KEY"), session_properties=session_properties, start_audio_paused=False, diff --git a/src/pipecat/services/openai_realtime_beta/__init__.py b/src/pipecat/services/openai_realtime_beta/__init__.py index ebd37d148..a1cbafecd 100644 --- a/src/pipecat/services/openai_realtime_beta/__init__.py +++ b/src/pipecat/services/openai_realtime_beta/__init__.py @@ -1,2 +1,2 @@ from .events import InputAudioTranscription, SessionProperties, TurnDetection -from .llm_and_context import OpenAILLMServiceRealtimeBeta +from .openai import OpenAIRealtimeBetaLLMService diff --git a/src/pipecat/services/openai_realtime_beta/context.py b/src/pipecat/services/openai_realtime_beta/context.py new file mode 100644 index 000000000..1c8292085 --- /dev/null +++ b/src/pipecat/services/openai_realtime_beta/context.py @@ -0,0 +1,208 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import copy +import json + +from loguru import logger + +from pipecat.frames.frames import Frame, LLMMessagesUpdateFrame, LLMSetToolsFrame +from pipecat.processors.aggregators.openai_llm_context import ( + OpenAILLMContext, + OpenAILLMContextFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.openai import ( + OpenAIAssistantContextAggregator, + OpenAIUserContextAggregator, +) + +from . import events +from .frames import RealtimeMessagesUpdateFrame, RealtimeFunctionCallResultFrame + + +class OpenAIRealtimeLLMContext(OpenAILLMContext): + def __init__(self, messages=None, tools=None, **kwargs): + super().__init__(messages=messages, tools=tools, **kwargs) + self.__setup_local() + + def __setup_local(self): + self.llm_needs_settings_update = True + self.llm_needs_initial_messages = True + self._session_instructions = "" + + return + + @staticmethod + def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext": + if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext): + obj.__class__ = OpenAIRealtimeLLMContext + obj.__setup_local() + return obj + + # todo + # - finish implementing all frames + # - add message conversion functions to OpenAILLMContext base class + + def from_standard_message(self, message): + if message.get("role") == "assistant" and message.get("tool_calls"): + tc = message.get("tool_calls")[0] + return events.ConversationItem( + type="function_call", + call_id=tc["id"], + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + ) + logger.error(f"Unhandled message type in from_standard_message: {message}") + + def get_messages_for_initializing_history(self): + # We can't load a long conversation history into the openai realtime api yet. (The API/model + # forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So + # our general strategy until this is fixed is just to put everything into a first "user" + # message as a single input. + if not self.messages: + return [] + + messages = copy.deepcopy(self.messages) + + # If we have a "system" message as our first message, let's pull that out into session + # "instructions" + if messages[0].get("role") == "system": + self.llm_needs_settings_update = True + system = messages.pop(0) + content = system.get("content") + if isinstance(content, str): + self._session_instructions = content + elif isinstance(content, list): + self._session_instructions = content[0].get("text") + if not messages: + return [] + + # If we have just a single "user" item, we can just send it normally + if len(messages) == 1 and messages[0].get("role") == "user": + return messages + + # Otherwise, let's pack everything into a single "user" message with a bit of + # explanation for the LLM + intro_text = """ + This is a previously saved conversation. Please treat this conversation history as a + starting point for the current conversation.""" + + trailing_text = """ + This is the end of the previously saved conversation. Please continue the conversation + from here. If the last message is a user instruction or question, act on that instruction + or answer the question. If the last message is an assistant response, simple say that you + are ready to continue the conversation.""" + + return [ + { + "role": "user", + "type": "message", + "content": [ + { + "type": "input_text", + "text": "\n\n".join( + [intro_text, json.dumps(messages, indent=2), trailing_text] + ), + } + ], + } + ] + + def add_user_content_item_as_message(self, item): + message = { + "role": "user", + "content": [{"type": "text", "text": item.content[0].transcript}], + } + self.add_message(message) + + def add_assistant_content_item_as_message(self, item): + message = {"role": "assistant", "content": []} + for content in item.content: + if content.type == "audio": + message["content"].append({"type": "text", "text": content.transcript}) + else: + logger.error(f"Unhandled content type in assistant item: {content.type} - {item}") + self.add_message(message) + + +class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator): + async def process_frame( + self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM + ): + await super().process_frame(frame, direction) + # Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline, + # messages are only processed by the user context aggregator, which is generally what we want. But + # we also need to send new messages over the websocket, so the openai realtime API has them + # in its context. + if isinstance(frame, LLMMessagesUpdateFrame): + await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context)) + + # Parent also doesn't push the LLMSetToolsFrame. + if isinstance(frame, LLMSetToolsFrame): + await self.push_frame(frame, direction) + + async def _push_aggregation(self): + # for the moment, ignore all user input coming into the pipeline. + # todo: think about whether/how to fix this to allow for text input from + # upstream (transport/transcription, or other sources) + pass + + +class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator): + async def _push_aggregation(self): + # the only thing we implement here is function calling. in all other cases, messages + # are added to the context when we receive openai realtime api events + if not self._function_call_result: + return + + self._reset() + try: + run_llm = True + frame = self._function_call_result + self._function_call_result = None + if frame.result: + # The "tool_call" message from the LLM that triggered the function call + self._context.add_message( + { + "role": "assistant", + "tool_calls": [ + { + "id": frame.tool_call_id, + "function": { + "name": frame.function_name, + "arguments": json.dumps(frame.arguments), + }, + "type": "function", + } + ], + } + ) + # The result of the function call. Need to add this both to our context here and to + # the openai realtime api context. + result_message = { + "role": "tool", + "content": json.dumps(frame.result), + "tool_call_id": frame.tool_call_id, + } + + self._context.add_message(result_message) + # The standard function callback code path pushes the FunctionCallResultFrame from the llm itself, + # so we didn't have a chance to add the result to the openai realtime api context. Let's push a + # special frame to do that. + await self._user_context_aggregator.push_frame( + RealtimeFunctionCallResultFrame(result_frame=frame) + ) + run_llm = frame.run_llm + + if run_llm: + await self._user_context_aggregator.push_context_frame() + + frame = OpenAILLMContextFrame(self._context) + await self.push_frame(frame) + + except Exception as e: + logger.error(f"Error processing frame: {e}") diff --git a/src/pipecat/services/openai_realtime_beta/frames.py b/src/pipecat/services/openai_realtime_beta/frames.py new file mode 100644 index 000000000..54bdcd467 --- /dev/null +++ b/src/pipecat/services/openai_realtime_beta/frames.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +from dataclasses import dataclass + +from pipecat.frames.frames import DataFrame, FunctionCallResultFrame + + +@dataclass +class RealtimeMessagesUpdateFrame(DataFrame): + context: "OpenAIRealtimeLLMContext" + + +@dataclass +class RealtimeFunctionCallResultFrame(DataFrame): + result_frame: FunctionCallResultFrame diff --git a/src/pipecat/services/openai_realtime_beta/llm_and_context.py b/src/pipecat/services/openai_realtime_beta/openai.py similarity index 71% rename from src/pipecat/services/openai_realtime_beta/llm_and_context.py rename to src/pipecat/services/openai_realtime_beta/openai.py index 68f1be177..0c57ed0bb 100644 --- a/src/pipecat/services/openai_realtime_beta/llm_and_context.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -6,27 +6,23 @@ import asyncio import base64 -import copy import json import time + from dataclasses import dataclass import websockets -from loguru import logger from pipecat.frames.frames import ( BotStoppedSpeakingFrame, CancelFrame, - DataFrame, EndFrame, ErrorFrame, Frame, - FunctionCallResultFrame, InputAudioRawFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesAppendFrame, - LLMMessagesUpdateFrame, LLMSetToolsFrame, LLMUpdateSettingsFrame, StartFrame, @@ -47,36 +43,22 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_services import LLMService -from pipecat.services.openai import ( - OpenAIAssistantContextAggregator, - OpenAIContextAggregatorPair, - OpenAIUserContextAggregator, -) +from pipecat.services.openai import OpenAIContextAggregatorPair from pipecat.utils.time import time_now_iso8601 from . import events -from .events import SessionProperties +from .context import ( + OpenAIRealtimeLLMContext, + OpenAIRealtimeUserContextAggregator, + OpenAIRealtimeAssistantContextAggregator, +) +from .frames import RealtimeMessagesUpdateFrame, RealtimeFunctionCallResultFrame -# websocket logger -- in case needed for debugging send/recv -# import logging -# logging.basicConfig( -# format="%(message)s", -# level=logging.DEBUG, -# ) +from loguru import logger @dataclass -class _InternalMessagesUpdateFrame(DataFrame): - context: "OpenAIRealtimeLLMContext" - - -@dataclass -class _InternalFunctionCallResultFrame(DataFrame): - result_frame: FunctionCallResultFrame - - -@dataclass -class _CurrentAudioResponse: +class CurrentAudioResponse: item_id: str content_index: int start_time_ms: int @@ -87,190 +69,7 @@ class OpenAIUnhandledFunctionException(Exception): pass -class OpenAIRealtimeLLMContext(OpenAILLMContext): - def __init__(self, messages=None, tools=None, **kwargs): - super().__init__(messages=messages, tools=tools, **kwargs) - self.__setup_local() - - def __setup_local(self): - self.llm_needs_settings_update = True - self.llm_needs_initial_messages = True - self._session_instructions = "" - - return - - @staticmethod - def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext": - if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext): - obj.__class__ = OpenAIRealtimeLLMContext - obj.__setup_local() - return obj - - # todo - # - finish implementing all frames - # - add message conversion functions to OpenAILLMContext base class - - def from_standard_message(self, message): - if message.get("role") == "assistant" and message.get("tool_calls"): - tc = message.get("tool_calls")[0] - return events.ConversationItem( - type="function_call", - call_id=tc["id"], - name=tc["function"]["name"], - arguments=tc["function"]["arguments"], - ) - logger.error(f"Unhandled message type in from_standard_message: {message}") - - def get_messages_for_initializing_history(self): - # We can't load a long conversation history into the openai realtime api yet. (The API/model - # forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So - # our general strategy until this is fixed is just to put everything into a first "user" - # message as a single input. - if not self.messages: - return [] - - messages = copy.deepcopy(self.messages) - - # If we have a "system" message as our first message, let's pull that out into session - # "instructions" - if messages[0].get("role") == "system": - self.llm_needs_settings_update = True - system = messages.pop(0) - content = system.get("content") - if isinstance(content, str): - self._session_instructions = content - elif isinstance(content, list): - self._session_instructions = content[0].get("text") - if not messages: - return [] - - # If we have just a single "user" item, we can just send it normally - if len(messages) == 1 and messages[0].get("role") == "user": - return messages - - # Otherwise, let's pack everything into a single "user" message with a bit of - # explanation for the LLM - intro_text = """ - This is a previously saved conversation. Please treat this conversation history as a - starting point for the current conversation.""" - - trailing_text = """ - This is the end of the previously saved conversation. Please continue the conversation - from here. If the last message is a user instruction or question, act on that instruction - or answer the question. If the last message is an assistant response, simple say that you - are ready to continue the conversation.""" - - return [ - { - "role": "user", - "type": "message", - "content": [ - { - "type": "input_text", - "text": "\n\n".join( - [intro_text, json.dumps(messages, indent=2), trailing_text] - ), - } - ], - } - ] - - def add_user_content_item_as_message(self, item): - message = { - "role": "user", - "content": [{"type": "text", "text": item.content[0].transcript}], - } - self.add_message(message) - - def add_assistant_content_item_as_message(self, item): - message = {"role": "assistant", "content": []} - for content in item.content: - if content.type == "audio": - message["content"].append({"type": "text", "text": content.transcript}) - else: - logger.error(f"Unhandled content type in assistant item: {content.type} - {item}") - self.add_message(message) - - -class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator): - async def process_frame( - self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM - ): - await super().process_frame(frame, direction) - # Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline, - # messages are only processed by the user context aggregator, which is generally what we want. But - # we also need to send new messages over the websocket, so the openai realtime API has them - # in its context. - if isinstance(frame, LLMMessagesUpdateFrame): - await self.push_frame(_InternalMessagesUpdateFrame(context=self._context)) - - # Parent also doesn't push the LLMSetToolsFrame. - if isinstance(frame, LLMSetToolsFrame): - await self.push_frame(frame, direction) - - async def _push_aggregation(self): - # for the moment, ignore all user input coming into the pipeline. - # todo: think about whether/how to fix this to allow for text input from - # upstream (transport/transcription, or other sources) - pass - - -class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator): - async def _push_aggregation(self): - # the only thing we implement here is function calling. in all other cases, messages - # are added to the context when we receive openai realtime api events - if not self._function_call_result: - return - - self._reset() - try: - frame = self._function_call_result - self._function_call_result = None - if frame.result: - # The "tool_call" message from the LLM that triggered the function call - self._context.add_message( - { - "role": "assistant", - "tool_calls": [ - { - "id": frame.tool_call_id, - "function": { - "name": frame.function_name, - "arguments": json.dumps(frame.arguments), - }, - "type": "function", - } - ], - } - ) - # The result of the function call. Need to add this both to our context here and to - # the openai realtime api context. - result_message = { - "role": "tool", - "content": json.dumps(frame.result), - "tool_call_id": frame.tool_call_id, - } - - self._context.add_message(result_message) - # The standard function callback code path pushes the FunctionCallResultFrame from the llm itself, - # so we didn't have a chance to add the result to the openai realtime api context. Let's push a - # special frame to do that. - await self._user_context_aggregator.push_frame( - _InternalFunctionCallResultFrame(result_frame=frame) - ) - run_llm = frame.run_llm - - if run_llm: - await self._user_context_aggregator.push_context_frame() - - frame = OpenAILLMContextFrame(self._context) - await self.push_frame(frame) - - except Exception as e: - logger.error(f"Error processing frame: {e}") - - -class OpenAILLMServiceRealtimeBeta(LLMService): +class OpenAIRealtimeBetaLLMService(LLMService): def __init__( self, *, @@ -400,14 +199,14 @@ class OpenAILLMServiceRealtimeBeta(LLMService): await self._handle_bot_stopped_speaking() elif isinstance(frame, LLMMessagesAppendFrame): await self._handle_messages_append(frame) - elif isinstance(frame, _InternalMessagesUpdateFrame): + elif isinstance(frame, RealtimeMessagesUpdateFrame): self._context = frame.context elif isinstance(frame, LLMUpdateSettingsFrame): - self._session_properties = SessionProperties(**frame.settings) + self._session_properties = events.SessionProperties(**frame.settings) await self._update_settings() elif isinstance(frame, LLMSetToolsFrame): await self._update_settings() - elif isinstance(frame, _InternalFunctionCallResultFrame): + elif isinstance(frame, RealtimeFunctionCallResultFrame): await self._handle_function_call_result(frame.result_frame) await self.push_frame(frame, direction) @@ -527,7 +326,6 @@ class OpenAILLMServiceRealtimeBeta(LLMService): return else: - # logger.debug(f"!!! Unhandled event: {evt}") pass except asyncio.CancelledError: logger.debug("websocket receive task cancelled") @@ -552,7 +350,7 @@ class OpenAILLMServiceRealtimeBeta(LLMService): # this event from the server await self.stop_ttfb_metrics() if not self._current_audio_response: - self._current_audio_response = _CurrentAudioResponse( + self._current_audio_response = CurrentAudioResponse( item_id=evt.item_id, content_index=evt.content_index, start_time_ms=int(time.time() * 1000),