From 780e91eb91d9c482bf3d78589b2e7631fa5b5f38 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Tue, 23 Sep 2025 12:50:35 -0400 Subject: [PATCH] Update persistent conversation storage examples to use universal `LLMContext`. Note that `LLMContext` doesn't have a `get_messages_for_persistent_storage()`; the messages are already in the "standard" format so they can be used directly for storage. --- .../20a-persistent-context-openai.py | 128 +++++++-------- .../20c-persistent-context-anthropic.py | 116 ++++++------- .../20d-persistent-context-gemini.py | 155 +++++++++--------- 3 files changed, 193 insertions(+), 206 deletions(-) diff --git a/examples/foundational/20a-persistent-context-openai.py b/examples/foundational/20a-persistent-context-openai.py index 9cae10573..93c1fa438 100644 --- a/examples/foundational/20a-persistent-context-openai.py +++ b/examples/foundational/20a-persistent-context-openai.py @@ -12,6 +12,8 @@ from datetime import datetime from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer @@ -20,9 +22,8 @@ from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, -) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.cartesia.tts import CartesiaTTSService @@ -66,11 +67,11 @@ async def save_conversation(params: FunctionCallParams): timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") filename = f"{BASE_FILENAME}{timestamp}.json" logger.debug( - f"writing conversation to {filename}\n{json.dumps(params.context.messages, indent=4)}" + f"writing conversation to {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" ) try: with open(filename, "w") as file: - messages = params.context.get_messages_for_persistent_storage() + messages = params.context.get_messages() # remove the last message, which is the instruction we just gave to save the conversation messages.pop() json.dump(messages, file, indent=2) @@ -87,7 +88,7 @@ async def load_conversation(params: FunctionCallParams): with open(filename, "r") as file: params.context.set_messages(json.load(file)) logger.debug( - f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}" + f"loaded conversation from {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" ) await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation.")) except Exception as e: @@ -100,71 +101,58 @@ messages = [ "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", }, ] -tools = [ - { - "type": "function", - "function": { - "name": "get_current_weather", - "description": "Get the current weather", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA", - }, - "format": { - "type": "string", - "enum": ["celsius", "fahrenheit"], - "description": "The temperature unit to use. Infer this from the users location.", - }, - }, - "required": ["location", "format"], - }, + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", }, }, - { - "type": "function", - "function": { - "name": "save_conversation", - "description": "Save the current conversatione. Use this function to persist the current conversation to external storage.", - "parameters": { - "type": "object", - "properties": {}, - "required": [], - }, - }, + required=["location", "format"], +) + +save_conversation_function = FunctionSchema( + name="save_conversation", + description="Save the current conversatione. Use this function to persist the current conversation to external storage.", + properties={}, + required=[], +) + +get_filenames_function = FunctionSchema( + name="get_saved_conversation_filenames", + description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", + properties={}, + required=[], +) + +load_conversation_function = FunctionSchema( + name="load_conversation", + description="Load a conversation history. Use this function to load a conversation history into the current session.", + properties={ + "filename": { + "type": "string", + "description": "The filename of the conversation history to load.", + } }, - { - "type": "function", - "function": { - "name": "get_saved_conversation_filenames", - "description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", - "parameters": { - "type": "object", - "properties": {}, - "required": [], - }, - }, - }, - { - "type": "function", - "function": { - "name": "load_conversation", - "description": "Load a conversation history. Use this function to load a conversation history into the current session.", - "parameters": { - "type": "object", - "properties": { - "filename": { - "type": "string", - "description": "The filename of the conversation history to load.", - } - }, - "required": ["filename"], - }, - }, - }, -] + required=["filename"], +) + +tools = ToolsSchema( + standard_tools=[ + weather_function, + save_conversation_function, + get_filenames_function, + load_conversation_function, + ] +) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -211,8 +199,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames) llm.register_function("load_conversation", load_conversation) - context = OpenAILLMContext(messages, tools) - context_aggregator = llm.create_context_aggregator(context) + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) pipeline = Pipeline( [ diff --git a/examples/foundational/20c-persistent-context-anthropic.py b/examples/foundational/20c-persistent-context-anthropic.py index 92b435aa8..411a976b8 100644 --- a/examples/foundational/20c-persistent-context-anthropic.py +++ b/examples/foundational/20c-persistent-context-anthropic.py @@ -12,6 +12,8 @@ from datetime import datetime from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer @@ -20,9 +22,8 @@ from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, -) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.anthropic.llm import AnthropicLLMService @@ -67,12 +68,12 @@ async def save_conversation(params: FunctionCallParams): timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") filename = f"{BASE_FILENAME}{timestamp}.json" logger.debug( - f"writing conversation to {filename}\n{json.dumps(params.context.messages, indent=4)}" + f"writing conversation to {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" ) try: with open(filename, "w") as file: # todo: extract 'system' into the first message in the list - messages = params.context.get_messages_for_persistent_storage() + messages = params.context.get_messages() # remove the last message, which is the instruction we just gave to save the conversation messages.pop() json.dump(messages, file, indent=2) @@ -89,7 +90,7 @@ async def load_conversation(params: FunctionCallParams): with open(filename, "r") as file: params.context.set_messages(json.load(file)) logger.debug( - f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}" + f"loaded conversation from {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" ) await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation.")) except Exception as e: @@ -108,59 +109,58 @@ messages = [ # {"role": "user", "content": "Tell me"}, # {"role": "user", "content": "a joke"}, ] -tools = [ - { - "name": "get_current_weather", - "description": "Get the current weather", - "input_schema": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA", - }, - "format": { - "type": "string", - "enum": ["celsius", "fahrenheit"], - "description": "The temperature unit to use. Infer this from the users location.", - }, - }, - "required": ["location", "format"], + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", }, }, - { - "name": "save_conversation", - "description": "Save the current conversation. Use this function to persist the current conversation to external storage.", - "input_schema": { - "type": "object", - "properties": {}, - "required": [], - }, + required=["location", "format"], +) + +save_conversation_function = FunctionSchema( + name="save_conversation", + description="Save the current conversation. Use this function to persist the current conversation to external storage.", + properties={}, + required=[], +) + +get_filenames_function = FunctionSchema( + name="get_saved_conversation_filenames", + description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", + properties={}, + required=[], +) + +load_conversation_function = FunctionSchema( + name="load_conversation", + description="Load a conversation history. Use this function to load a conversation history into the current session.", + properties={ + "filename": { + "type": "string", + "description": "The filename of the conversation history to load.", + } }, - { - "name": "get_saved_conversation_filenames", - "description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", - "input_schema": { - "type": "object", - "properties": {}, - "required": [], - }, - }, - { - "name": "load_conversation", - "description": "Load a conversation history. Use this function to load a conversation history into the current session.", - "input_schema": { - "type": "object", - "properties": { - "filename": { - "type": "string", - "description": "The filename of the conversation history to load.", - } - }, - "required": ["filename"], - }, - }, -] + required=["filename"], +) + +tools = ToolsSchema( + standard_tools=[ + weather_function, + save_conversation_function, + get_filenames_function, + load_conversation_function, + ] +) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -211,8 +211,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames) llm.register_function("load_conversation", load_conversation) - context = OpenAILLMContext(messages, tools) - context_aggregator = llm.create_context_aggregator(context) + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) pipeline = Pipeline( [ diff --git a/examples/foundational/20d-persistent-context-gemini.py b/examples/foundational/20d-persistent-context-gemini.py index 959256cb2..8dad8148d 100644 --- a/examples/foundational/20d-persistent-context-gemini.py +++ b/examples/foundational/20d-persistent-context-gemini.py @@ -12,6 +12,8 @@ from datetime import datetime from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer @@ -20,9 +22,8 @@ from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, -) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import ( create_transport, @@ -85,12 +86,12 @@ async def save_conversation(params: FunctionCallParams): timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") filename = f"{BASE_FILENAME}{timestamp}.json" logger.debug( - f"writing conversation to {filename}\n{json.dumps(params.context.get_messages_for_logging(), indent=4)}" + f"writing conversation to {filename}\n{json.dumps(params.context.get_messages(), indent=4)}" ) try: with open(filename, "w") as file: # todo: extract 'system' into the first message in the list - messages = params.context.get_messages_for_persistent_storage() + messages = params.context.get_messages() # remove the last message (the instruction to save the context) messages.pop() json.dump(messages, file, indent=2) @@ -151,78 +152,76 @@ indicate you should use the get_image tool are: # {"role": "user", "content": "Tell me"}, # {"role": "user", "content": "a joke"}, ] -tools = [ - { - "function_declarations": [ - { - "name": "get_current_weather", - "description": "Get the current weather", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA", - }, - "format": { - "type": "string", - "enum": ["celsius", "fahrenheit"], - "description": "The temperature unit to use. Infer this from the users location.", - }, - }, - "required": ["location", "format"], - }, - }, - { - "name": "save_conversation", - "description": "Save the current conversation. Use this function to persist the current conversation to external storage.", - "parameters": { - "type": "object", - "properties": { - "user_request_text": { - "type": "string", - "description": "The text of the user's request to save the conversation.", - } - }, - "required": ["user_request_text"], - }, - }, - { - "name": "get_saved_conversation_filenames", - "description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", - "parameters": None, - }, - { - "name": "load_conversation", - "description": "Load a conversation history. Use this function to load a conversation history into the current session.", - "parameters": { - "type": "object", - "properties": { - "filename": { - "type": "string", - "description": "The filename of the conversation history to load.", - } - }, - "required": ["filename"], - }, - }, - { - "name": "get_image", - "description": "Get and image from the camera or video stream.", - "parameters": { - "type": "object", - "properties": { - "question": { - "type": "string", - "description": "The question to to use when running inference on the acquired image.", - }, - }, - "required": ["question"], - }, - }, - ] + +weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the users location.", + }, }, -] + required=["location", "format"], +) + +save_conversation_function = FunctionSchema( + name="save_conversation", + description="Save the current conversation. Use this function to persist the current conversation to external storage.", + properties={ + "user_request_text": { + "type": "string", + "description": "The text of the user's request to save the conversation.", + } + }, + required=["user_request_text"], +) + +get_filenames_function = FunctionSchema( + name="get_saved_conversation_filenames", + description="Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.", + properties={}, + required=[], +) + +load_conversation_function = FunctionSchema( + name="load_conversation", + description="Load a conversation history. Use this function to load a conversation history into the current session.", + properties={ + "filename": { + "type": "string", + "description": "The filename of the conversation history to load.", + } + }, + required=["filename"], +) + +get_image_function = FunctionSchema( + name="get_image", + description="Get and image from the camera or video stream.", + properties={ + "question": { + "type": "string", + "description": "The question to to use when running inference on the acquired image.", + }, + }, + required=["question"], +) + +tools = ToolsSchema( + standard_tools=[ + weather_function, + save_conversation_function, + get_filenames_function, + load_conversation_function, + get_image_function, + ] +) # We store functions so objects (e.g. SileroVADAnalyzer) don't get @@ -266,8 +265,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): llm.register_function("load_conversation", load_conversation) llm.register_function("get_image", get_image) - context = OpenAILLMContext(messages, tools) - context_aggregator = llm.create_context_aggregator(context) + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair(context) pipeline = Pipeline( [