services(openai): rename OpenAILLMServiceRealtimeBeta to OpenAIRealtimeBetaLLMService

This commit is contained in:
Aleix Conchillo Flaqué
2024-10-16 10:19:00 -07:00
parent 4075b19f7c
commit edd44cc181
7 changed files with 255 additions and 225 deletions

View File

@@ -5,6 +5,13 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Changed
- Renamed `OpenAILLMServiceRealtimeBeta` to `OpenAIRealtimeBetaLLMService` to
match other services.
## [0.0.45] - 2024-10-16
### Changed

View File

@@ -17,12 +17,10 @@ from runner import configure
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAILLMServiceRealtimeBeta,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
@@ -116,7 +114,7 @@ unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAILLMServiceRealtimeBeta(
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,

View File

@@ -24,7 +24,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAILLMServiceRealtimeBeta,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
@@ -211,7 +211,7 @@ unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = OpenAILLMServiceRealtimeBeta(
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,

View File

@@ -1,2 +1,2 @@
from .events import InputAudioTranscription, SessionProperties, TurnDetection
from .llm_and_context import OpenAILLMServiceRealtimeBeta
from .openai import OpenAIRealtimeBetaLLMService

View File

@@ -0,0 +1,208 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import copy
import json
from loguru import logger
from pipecat.frames.frames import Frame, LLMMessagesUpdateFrame, LLMSetToolsFrame
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from . import events
from .frames import RealtimeMessagesUpdateFrame, RealtimeFunctionCallResultFrame
class OpenAIRealtimeLLMContext(OpenAILLMContext):
def __init__(self, messages=None, tools=None, **kwargs):
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
obj.__class__ = OpenAIRealtimeLLMContext
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
# - add message conversion functions to OpenAILLMContext base class
def from_standard_message(self, message):
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return messages
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
message = {
"role": "user",
"content": [{"type": "text", "text": item.content[0].transcript}],
}
self.add_message(message)
def add_assistant_content_item_as_message(self, item):
message = {"role": "assistant", "content": []}
for content in item.content:
if content.type == "audio":
message["content"].append({"type": "text", "text": content.transcript})
else:
logger.error(f"Unhandled content type in assistant item: {content.type} - {item}")
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
# messages are only processed by the user context aggregator, which is generally what we want. But
# we also need to send new messages over the websocket, so the openai realtime API has them
# in its context.
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context))
# Parent also doesn't push the LLMSetToolsFrame.
if isinstance(frame, LLMSetToolsFrame):
await self.push_frame(frame, direction)
async def _push_aggregation(self):
# for the moment, ignore all user input coming into the pipeline.
# todo: think about whether/how to fix this to allow for text input from
# upstream (transport/transcription, or other sources)
pass
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
async def _push_aggregation(self):
# the only thing we implement here is function calling. in all other cases, messages
# are added to the context when we receive openai realtime api events
if not self._function_call_result:
return
self._reset()
try:
run_llm = True
frame = self._function_call_result
self._function_call_result = None
if frame.result:
# The "tool_call" message from the LLM that triggered the function call
self._context.add_message(
{
"role": "assistant",
"tool_calls": [
{
"id": frame.tool_call_id,
"function": {
"name": frame.function_name,
"arguments": json.dumps(frame.arguments),
},
"type": "function",
}
],
}
)
# The result of the function call. Need to add this both to our context here and to
# the openai realtime api context.
result_message = {
"role": "tool",
"content": json.dumps(frame.result),
"tool_call_id": frame.tool_call_id,
}
self._context.add_message(result_message)
# The standard function callback code path pushes the FunctionCallResultFrame from the llm itself,
# so we didn't have a chance to add the result to the openai realtime api context. Let's push a
# special frame to do that.
await self._user_context_aggregator.push_frame(
RealtimeFunctionCallResultFrame(result_frame=frame)
)
run_llm = frame.run_llm
if run_llm:
await self._user_context_aggregator.push_context_frame()
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)
except Exception as e:
logger.error(f"Error processing frame: {e}")

View File

@@ -0,0 +1,19 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from dataclasses import dataclass
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
@dataclass
class RealtimeMessagesUpdateFrame(DataFrame):
context: "OpenAIRealtimeLLMContext"
@dataclass
class RealtimeFunctionCallResultFrame(DataFrame):
result_frame: FunctionCallResultFrame

View File

@@ -6,27 +6,23 @@
import asyncio
import base64
import copy
import json
import time
from dataclasses import dataclass
import websockets
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
DataFrame,
EndFrame,
ErrorFrame,
Frame,
FunctionCallResultFrame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMUpdateSettingsFrame,
StartFrame,
@@ -47,36 +43,22 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.services.openai import (
OpenAIAssistantContextAggregator,
OpenAIContextAggregatorPair,
OpenAIUserContextAggregator,
)
from pipecat.services.openai import OpenAIContextAggregatorPair
from pipecat.utils.time import time_now_iso8601
from . import events
from .events import SessionProperties
from .context import (
OpenAIRealtimeLLMContext,
OpenAIRealtimeUserContextAggregator,
OpenAIRealtimeAssistantContextAggregator,
)
from .frames import RealtimeMessagesUpdateFrame, RealtimeFunctionCallResultFrame
# websocket logger -- in case needed for debugging send/recv
# import logging
# logging.basicConfig(
# format="%(message)s",
# level=logging.DEBUG,
# )
from loguru import logger
@dataclass
class _InternalMessagesUpdateFrame(DataFrame):
context: "OpenAIRealtimeLLMContext"
@dataclass
class _InternalFunctionCallResultFrame(DataFrame):
result_frame: FunctionCallResultFrame
@dataclass
class _CurrentAudioResponse:
class CurrentAudioResponse:
item_id: str
content_index: int
start_time_ms: int
@@ -87,190 +69,7 @@ class OpenAIUnhandledFunctionException(Exception):
pass
class OpenAIRealtimeLLMContext(OpenAILLMContext):
def __init__(self, messages=None, tools=None, **kwargs):
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
obj.__class__ = OpenAIRealtimeLLMContext
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
# - add message conversion functions to OpenAILLMContext base class
def from_standard_message(self, message):
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return messages
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
message = {
"role": "user",
"content": [{"type": "text", "text": item.content[0].transcript}],
}
self.add_message(message)
def add_assistant_content_item_as_message(self, item):
message = {"role": "assistant", "content": []}
for content in item.content:
if content.type == "audio":
message["content"].append({"type": "text", "text": content.transcript})
else:
logger.error(f"Unhandled content type in assistant item: {content.type} - {item}")
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
# messages are only processed by the user context aggregator, which is generally what we want. But
# we also need to send new messages over the websocket, so the openai realtime API has them
# in its context.
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(_InternalMessagesUpdateFrame(context=self._context))
# Parent also doesn't push the LLMSetToolsFrame.
if isinstance(frame, LLMSetToolsFrame):
await self.push_frame(frame, direction)
async def _push_aggregation(self):
# for the moment, ignore all user input coming into the pipeline.
# todo: think about whether/how to fix this to allow for text input from
# upstream (transport/transcription, or other sources)
pass
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
async def _push_aggregation(self):
# the only thing we implement here is function calling. in all other cases, messages
# are added to the context when we receive openai realtime api events
if not self._function_call_result:
return
self._reset()
try:
frame = self._function_call_result
self._function_call_result = None
if frame.result:
# The "tool_call" message from the LLM that triggered the function call
self._context.add_message(
{
"role": "assistant",
"tool_calls": [
{
"id": frame.tool_call_id,
"function": {
"name": frame.function_name,
"arguments": json.dumps(frame.arguments),
},
"type": "function",
}
],
}
)
# The result of the function call. Need to add this both to our context here and to
# the openai realtime api context.
result_message = {
"role": "tool",
"content": json.dumps(frame.result),
"tool_call_id": frame.tool_call_id,
}
self._context.add_message(result_message)
# The standard function callback code path pushes the FunctionCallResultFrame from the llm itself,
# so we didn't have a chance to add the result to the openai realtime api context. Let's push a
# special frame to do that.
await self._user_context_aggregator.push_frame(
_InternalFunctionCallResultFrame(result_frame=frame)
)
run_llm = frame.run_llm
if run_llm:
await self._user_context_aggregator.push_context_frame()
frame = OpenAILLMContextFrame(self._context)
await self.push_frame(frame)
except Exception as e:
logger.error(f"Error processing frame: {e}")
class OpenAILLMServiceRealtimeBeta(LLMService):
class OpenAIRealtimeBetaLLMService(LLMService):
def __init__(
self,
*,
@@ -400,14 +199,14 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
await self._handle_bot_stopped_speaking()
elif isinstance(frame, LLMMessagesAppendFrame):
await self._handle_messages_append(frame)
elif isinstance(frame, _InternalMessagesUpdateFrame):
elif isinstance(frame, RealtimeMessagesUpdateFrame):
self._context = frame.context
elif isinstance(frame, LLMUpdateSettingsFrame):
self._session_properties = SessionProperties(**frame.settings)
self._session_properties = events.SessionProperties(**frame.settings)
await self._update_settings()
elif isinstance(frame, LLMSetToolsFrame):
await self._update_settings()
elif isinstance(frame, _InternalFunctionCallResultFrame):
elif isinstance(frame, RealtimeFunctionCallResultFrame):
await self._handle_function_call_result(frame.result_frame)
await self.push_frame(frame, direction)
@@ -527,7 +326,6 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
return
else:
# logger.debug(f"!!! Unhandled event: {evt}")
pass
except asyncio.CancelledError:
logger.debug("websocket receive task cancelled")
@@ -552,7 +350,7 @@ class OpenAILLMServiceRealtimeBeta(LLMService):
# this event from the server
await self.stop_ttfb_metrics()
if not self._current_audio_response:
self._current_audio_response = _CurrentAudioResponse(
self._current_audio_response = CurrentAudioResponse(
item_id=evt.item_id,
content_index=evt.content_index,
start_time_ms=int(time.time() * 1000),