Update OpenAIRealtimeLLMService to work with LLMContext and LLMContextAggregatorPair (cont'd).

Update `create_context_aggregator()` (which we're keeping around for backward compatibility) to create a `LLMContextAggregatorPair` rather than OpenAI-Realtime-specific aggregators.
This commit is contained in:
Paul Kompfner
2025-10-20 16:58:44 -04:00
parent 61944d22ef
commit bab0aaf585
3 changed files with 275 additions and 16 deletions

View File

@@ -4,7 +4,94 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Realtime LLM context and aggregator implementations."""
"""OpenAI Realtime LLM context and aggregator implementations.
.. deprecated:: 0.0.92
OpenAI Realtime no longer uses types from this module under the hood.
It now uses `LLMContext` and `LLMContextAggregatorPair`.
Using the new patterns should allow you to not need types from this module.
BEFORE:
```
# Setup
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
# Context aggregator type
context_aggregator: OpenAIContextAggregatorPair
# Context frame type
frame: OpenAILLMContextFrame
# Context type
context: OpenAIRealtimeLLMContext
# or
context: OpenAILLMContext
# Reading messages from context
messages = context.messages
```
AFTER:
```
# Setup
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
# Context aggregator type
context_aggregator: LLMContextAggregatorPair
# Context frame type
frame: LLMContextFrame
# Context type
context: LLMContext
# Reading messages from context
messages = context.get_messages()
```
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai.realtime.llm are deprecated. \n"
"OpenAI Realtime no longer uses types from this module under the hood. \n"
"It now uses `LLMContext` and `LLMContextAggregatorPair`. \n"
"Using the new patterns should allow you to not need types from this module.\n\n"
"BEFORE:\n"
"```\n"
"# Setup\n"
"context = OpenAILLMContext(messages, tools)\n"
"context_aggregator = llm.create_context_aggregator(context)\n\n"
"# Context aggregator type\n"
"context_aggregator: OpenAIContextAggregatorPair\n\n"
"# Context frame type\n"
"frame: OpenAILLMContextFrame\n\n"
"# Context type\n"
"context: OpenAIRealtimeLLMContext\n"
"# or\n"
"context: OpenAILLMContext\n\n"
"# Reading messages from context\n"
"messages = context.messages\n"
"```\n\n"
"AFTER:\n"
"```\n"
"# Setup\n"
"context = LLMContext(messages, tools)\n"
"context_aggregator = LLMContextAggregatorPair(context)\n\n"
"# Context aggregator type\n"
"context_aggregator: LLMContextAggregatorPair\n\n"
"# Context frame type\n"
"frame: LLMContextFrame\n\n"
"# Context type\n"
"context: LLMContext\n\n"
"# Reading messages from context\n"
"messages = context.get_messages()\n"
"```\n",
)
import copy
import json
@@ -31,6 +118,160 @@ from . import events
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
class OpenAIRealtimeLLMContext(OpenAILLMContext):
"""OpenAI Realtime LLM context with session management and message conversion.
Extends the standard OpenAI LLM context to support real-time session properties,
instruction management, and conversion between standard message formats and
realtime conversation items.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize the OpenAIRealtimeLLMContext.
Args:
messages: Initial conversation messages. Defaults to None.
tools: Available function tools. Defaults to None.
**kwargs: Additional arguments passed to parent OpenAILLMContext.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
"""Upgrade a standard OpenAI LLM context to a realtime context.
Args:
obj: The OpenAILLMContext instance to upgrade.
Returns:
The upgraded OpenAIRealtimeLLMContext instance.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
obj.__class__ = OpenAIRealtimeLLMContext
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
def from_standard_message(self, message):
"""Convert a standard message format to a realtime conversation item.
Args:
message: The standard message dictionary to convert.
Returns:
A ConversationItem instance for the realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
"""Get conversation items for initializing the realtime session history.
Converts the context's messages to a format suitable for the realtime API,
handling system instructions and conversation history packaging.
Returns:
List of conversation items for session initialization.
"""
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return [self.from_standard_message(messages[0])]
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
"""Add a user content item as a standard message to the context.
Args:
item: The conversation item to add as a user message.
"""
message = {
"role": "user",
"content": [{"type": "text", "text": item.content[0].transcript}],
}
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
"""User context aggregator for OpenAI Realtime API.

View File

@@ -4,7 +4,28 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Custom frame types for OpenAI Realtime API integration."""
"""Custom frame types for OpenAI Realtime API integration.
.. deprecated:: 0.0.92
OpenAI Realtime no longer uses types from this module under the hood.
It now works more like most LLM services in Pipecat, relying on updates to
its context, pushed by context aggregators, to update its internal state.
Listen for `LLMContextFrame`s for context updates.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai.realtime.frames are deprecated. \n"
"OpenAI Realtime no longer uses types from this module under the hood. \n\n"
"It now works more like other LLM services in Pipecat, relying on updates to \n"
"its context, pushed by context aggregators, to update its internal state.\n\n"
"Listen for `LLMContextFrame`s for context updates.\n"
)
from dataclasses import dataclass
from typing import TYPE_CHECKING

View File

@@ -48,6 +48,7 @@ from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -60,11 +61,6 @@ from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
from . import events
from .context import (
OpenAIRealtimeAssistantContextAggregator,
OpenAIRealtimeUserContextAggregator,
)
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
try:
from websockets.asyncio.client import connect as websocket_connect
@@ -832,9 +828,14 @@ class OpenAIRealtimeLLMService(LLMService):
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
) -> OpenAIContextAggregatorPair:
) -> LLMContextAggregatorPair:
"""Create an instance of OpenAIContextAggregatorPair from an OpenAILLMContext.
NOTE: this method exists only for backward compatibility. New code
should instead do:
context = LLMContext(...)
context_aggregator = LLMContextAggregatorPair(context)
Constructor keyword arguments for both the user and assistant aggregators can be provided.
Args:
@@ -847,11 +848,7 @@ class OpenAIRealtimeLLMService(LLMService):
the user and one for the assistant, encapsulated in an
OpenAIContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
OpenAIRealtimeLLMContext.upgrade_to_realtime(context)
user = OpenAIRealtimeUserContextAggregator(context, params=user_params)
assistant_params.expect_stripped_words = False
assistant = OpenAIRealtimeAssistantContextAggregator(context, params=assistant_params)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
context = LLMContext.from_openai_context(context)
return LLMContextAggregatorPair(
context, user_params=user_params, assistant_params=assistant_params
)