From bab0aaf585a4eb4ad089f36d7f9529661aacc95c Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 20 Oct 2025 16:58:44 -0400 Subject: [PATCH] Update `OpenAIRealtimeLLMService` to work with `LLMContext` and `LLMContextAggregatorPair` (cont'd). Update `create_context_aggregator()` (which we're keeping around for backward compatibility) to create a `LLMContextAggregatorPair` rather than OpenAI-Realtime-specific aggregators. --- .../services/openai/realtime/context.py | 243 +++++++++++++++++- .../services/openai/realtime/frames.py | 23 +- src/pipecat/services/openai/realtime/llm.py | 25 +- 3 files changed, 275 insertions(+), 16 deletions(-) diff --git a/src/pipecat/services/openai/realtime/context.py b/src/pipecat/services/openai/realtime/context.py index da08f78ad..57979406c 100644 --- a/src/pipecat/services/openai/realtime/context.py +++ b/src/pipecat/services/openai/realtime/context.py @@ -4,7 +4,94 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""OpenAI Realtime LLM context and aggregator implementations.""" +"""OpenAI Realtime LLM context and aggregator implementations. + +.. deprecated:: 0.0.92 + OpenAI Realtime no longer uses types from this module under the hood. + It now uses `LLMContext` and `LLMContextAggregatorPair`. + Using the new patterns should allow you to not need types from this module. + + BEFORE: + ``` + # Setup + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + # Context aggregator type + context_aggregator: OpenAIContextAggregatorPair + + # Context frame type + frame: OpenAILLMContextFrame + + # Context type + context: OpenAIRealtimeLLMContext + # or + context: OpenAILLMContext + + # Reading messages from context + messages = context.messages + ``` + + AFTER: + ``` + # Setup + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) + + # Context aggregator type + context_aggregator: LLMContextAggregatorPair + + # Context frame type + frame: LLMContextFrame + + # Context type + context: LLMContext + + # Reading messages from context + messages = context.get_messages() + ``` +""" + +import warnings + +with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Types in pipecat.services.openai.realtime.llm are deprecated. \n" + "OpenAI Realtime no longer uses types from this module under the hood. \n" + "It now uses `LLMContext` and `LLMContextAggregatorPair`. \n" + "Using the new patterns should allow you to not need types from this module.\n\n" + "BEFORE:\n" + "```\n" + "# Setup\n" + "context = OpenAILLMContext(messages, tools)\n" + "context_aggregator = llm.create_context_aggregator(context)\n\n" + "# Context aggregator type\n" + "context_aggregator: OpenAIContextAggregatorPair\n\n" + "# Context frame type\n" + "frame: OpenAILLMContextFrame\n\n" + "# Context type\n" + "context: OpenAIRealtimeLLMContext\n" + "# or\n" + "context: OpenAILLMContext\n\n" + "# Reading messages from context\n" + "messages = context.messages\n" + "```\n\n" + "AFTER:\n" + "```\n" + "# Setup\n" + "context = LLMContext(messages, tools)\n" + "context_aggregator = LLMContextAggregatorPair(context)\n\n" + "# Context aggregator type\n" + "context_aggregator: LLMContextAggregatorPair\n\n" + "# Context frame type\n" + "frame: LLMContextFrame\n\n" + "# Context type\n" + "context: LLMContext\n\n" + "# Reading messages from context\n" + "messages = context.get_messages()\n" + "```\n", + ) import copy import json @@ -31,6 +118,160 @@ from . import events from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame +class OpenAIRealtimeLLMContext(OpenAILLMContext): + """OpenAI Realtime LLM context with session management and message conversion. + + Extends the standard OpenAI LLM context to support real-time session properties, + instruction management, and conversion between standard message formats and + realtime conversation items. + """ + + def __init__(self, messages=None, tools=None, **kwargs): + """Initialize the OpenAIRealtimeLLMContext. + + Args: + messages: Initial conversation messages. Defaults to None. + tools: Available function tools. Defaults to None. + **kwargs: Additional arguments passed to parent OpenAILLMContext. + """ + super().__init__(messages=messages, tools=tools, **kwargs) + self.__setup_local() + + def __setup_local(self): + self.llm_needs_settings_update = True + self.llm_needs_initial_messages = True + self._session_instructions = "" + + return + + @staticmethod + def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext": + """Upgrade a standard OpenAI LLM context to a realtime context. + + Args: + obj: The OpenAILLMContext instance to upgrade. + + Returns: + The upgraded OpenAIRealtimeLLMContext instance. + """ + if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext): + obj.__class__ = OpenAIRealtimeLLMContext + obj.__setup_local() + return obj + + # todo + # - finish implementing all frames + + def from_standard_message(self, message): + """Convert a standard message format to a realtime conversation item. + + Args: + message: The standard message dictionary to convert. + + Returns: + A ConversationItem instance for the realtime API. + """ + if message.get("role") == "user": + content = message.get("content") + if isinstance(message.get("content"), list): + content = "" + for c in message.get("content"): + if c.get("type") == "text": + content += " " + c.get("text") + else: + logger.error( + f"Unhandled content type in context message: {c.get('type')} - {message}" + ) + return events.ConversationItem( + role="user", + type="message", + content=[events.ItemContent(type="input_text", text=content)], + ) + if message.get("role") == "assistant" and message.get("tool_calls"): + tc = message.get("tool_calls")[0] + return events.ConversationItem( + type="function_call", + call_id=tc["id"], + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + ) + logger.error(f"Unhandled message type in from_standard_message: {message}") + + def get_messages_for_initializing_history(self): + """Get conversation items for initializing the realtime session history. + + Converts the context's messages to a format suitable for the realtime API, + handling system instructions and conversation history packaging. + + Returns: + List of conversation items for session initialization. + """ + # We can't load a long conversation history into the openai realtime api yet. (The API/model + # forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So + # our general strategy until this is fixed is just to put everything into a first "user" + # message as a single input. + if not self.messages: + return [] + + messages = copy.deepcopy(self.messages) + + # If we have a "system" message as our first message, let's pull that out into session + # "instructions" + if messages[0].get("role") == "system": + self.llm_needs_settings_update = True + system = messages.pop(0) + content = system.get("content") + if isinstance(content, str): + self._session_instructions = content + elif isinstance(content, list): + self._session_instructions = content[0].get("text") + if not messages: + return [] + + # If we have just a single "user" item, we can just send it normally + if len(messages) == 1 and messages[0].get("role") == "user": + return [self.from_standard_message(messages[0])] + + # Otherwise, let's pack everything into a single "user" message with a bit of + # explanation for the LLM + intro_text = """ + This is a previously saved conversation. Please treat this conversation history as a + starting point for the current conversation.""" + + trailing_text = """ + This is the end of the previously saved conversation. Please continue the conversation + from here. If the last message is a user instruction or question, act on that instruction + or answer the question. If the last message is an assistant response, simple say that you + are ready to continue the conversation.""" + + return [ + { + "role": "user", + "type": "message", + "content": [ + { + "type": "input_text", + "text": "\n\n".join( + [intro_text, json.dumps(messages, indent=2), trailing_text] + ), + } + ], + } + ] + + def add_user_content_item_as_message(self, item): + """Add a user content item as a standard message to the context. + + Args: + item: The conversation item to add as a user message. + """ + message = { + "role": "user", + "content": [{"type": "text", "text": item.content[0].transcript}], + } + self.add_message(message) + + class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator): """User context aggregator for OpenAI Realtime API. diff --git a/src/pipecat/services/openai/realtime/frames.py b/src/pipecat/services/openai/realtime/frames.py index 8617c6efd..39cfd9757 100644 --- a/src/pipecat/services/openai/realtime/frames.py +++ b/src/pipecat/services/openai/realtime/frames.py @@ -4,7 +4,28 @@ # SPDX-License-Identifier: BSD 2-Clause License # -"""Custom frame types for OpenAI Realtime API integration.""" +"""Custom frame types for OpenAI Realtime API integration. + +.. deprecated:: 0.0.92 + OpenAI Realtime no longer uses types from this module under the hood. + + It now works more like most LLM services in Pipecat, relying on updates to + its context, pushed by context aggregators, to update its internal state. + + Listen for `LLMContextFrame`s for context updates. +""" + +import warnings + +with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Types in pipecat.services.openai.realtime.frames are deprecated. \n" + "OpenAI Realtime no longer uses types from this module under the hood. \n\n" + "It now works more like other LLM services in Pipecat, relying on updates to \n" + "its context, pushed by context aggregators, to update its internal state.\n\n" + "Listen for `LLMContextFrame`s for context updates.\n" + ) from dataclasses import dataclass from typing import TYPE_CHECKING diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index a3421cb75..eb2ba5ef4 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -48,6 +48,7 @@ from pipecat.processors.aggregators.llm_response import ( LLMAssistantAggregatorParams, LLMUserAggregatorParams, ) +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.processors.aggregators.openai_llm_context import ( OpenAILLMContext, OpenAILLMContextFrame, @@ -60,11 +61,6 @@ from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt from . import events -from .context import ( - OpenAIRealtimeAssistantContextAggregator, - OpenAIRealtimeUserContextAggregator, -) -from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame try: from websockets.asyncio.client import connect as websocket_connect @@ -832,9 +828,14 @@ class OpenAIRealtimeLLMService(LLMService): *, user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(), assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(), - ) -> OpenAIContextAggregatorPair: + ) -> LLMContextAggregatorPair: """Create an instance of OpenAIContextAggregatorPair from an OpenAILLMContext. + NOTE: this method exists only for backward compatibility. New code + should instead do: + context = LLMContext(...) + context_aggregator = LLMContextAggregatorPair(context) + Constructor keyword arguments for both the user and assistant aggregators can be provided. Args: @@ -847,11 +848,7 @@ class OpenAIRealtimeLLMService(LLMService): the user and one for the assistant, encapsulated in an OpenAIContextAggregatorPair. """ - context.set_llm_adapter(self.get_llm_adapter()) - - OpenAIRealtimeLLMContext.upgrade_to_realtime(context) - user = OpenAIRealtimeUserContextAggregator(context, params=user_params) - - assistant_params.expect_stripped_words = False - assistant = OpenAIRealtimeAssistantContextAggregator(context, params=assistant_params) - return OpenAIContextAggregatorPair(_user=user, _assistant=assistant) + context = LLMContext.from_openai_context(context) + return LLMContextAggregatorPair( + context, user_params=user_params, assistant_params=assistant_params + )