From 6101ff9661dd7064d30d28bac983ef58bc0fd7be Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 5 Nov 2025 14:16:44 +0100 Subject: [PATCH] Integrate Mem0 memory service into OpenAI Realtime API example - Add Mem0MemoryService to enable conversation memory persistence - Add get_initial_greeting function to create personalized greetings - Update pipeline to include memory service between user context and LLM - Add support for both cloud-based (Mem0 API) and local configurations - Update system instructions to include memory-related guidance - Modify on_client_connected handler to fetch and use personalized greetings - Update documentation with Mem0 setup and usage instructions --- examples/foundational/19-openai-realtime.py | 163 +++++++++++++++++++- 1 file changed, 161 insertions(+), 2 deletions(-) diff --git a/examples/foundational/19-openai-realtime.py b/examples/foundational/19-openai-realtime.py index b5edc0ff2..5e11d0100 100644 --- a/examples/foundational/19-openai-realtime.py +++ b/examples/foundational/19-openai-realtime.py @@ -4,10 +4,39 @@ # SPDX-License-Identifier: BSD 2-Clause License # +"""OpenAI Realtime API Example with Mem0 Memory Integration. + +This example demonstrates how to use OpenAI's Realtime API with Pipecat +for conversational AI with memory capabilities using Mem0. + +The example: + 1. Sets up a real-time audio conversation using OpenAI's Realtime API + 2. Uses Mem0 to store and retrieve memories from conversations + 3. Creates personalized greetings based on previous interactions + 4. Demonstrates function calling capabilities + 5. Shows how to add tools dynamically at runtime + +Example usage (run from pipecat root directory): + $ pip install "pipecat-ai[daily,openai,mem0]" + $ python examples/foundational/19-openai-realtime.py + +Requirements: + - OpenAI API key (for Realtime API) + - Daily API key (for video/audio transport) + - Mem0 API key (for cloud-based memory storage) + - [Optional] Deepgram API key (for STT fallback) + + Environment variables (set in .env or in your terminal using `export`): + DAILY_SAMPLE_ROOM_URL=daily_sample_room_url + DAILY_API_KEY=daily_api_key + OPENAI_API_KEY=openai_api_key + MEM0_API_KEY=mem0_api_key +""" import asyncio import os from datetime import datetime +from typing import Union from dotenv import load_dotenv from loguru import logger @@ -27,6 +56,7 @@ from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.mem0.memory import Mem0MemoryService from pipecat.services.openai.realtime.events import ( AudioConfiguration, AudioInput, @@ -42,6 +72,64 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams load_dotenv(override=True) +try: + from mem0 import Memory, MemoryClient # noqa: F401 +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY." + ) + raise Exception(f"Missing module: {e}") + + +async def get_initial_greeting( + memory_client: Union[MemoryClient, Memory], user_id: str, agent_id: str, run_id: str +) -> str: + """Fetch all memories for the user and create a personalized greeting. + + Returns: + A personalized greeting based on user memories + """ + try: + if isinstance(memory_client, Memory): + filters = {"user_id": user_id, "agent_id": agent_id, "run_id": run_id} + filters = {k: v for k, v in filters.items() if v is not None} + memories = memory_client.get_all(**filters) + else: + # Create filters based on available IDs + id_pairs = [("user_id", user_id), ("agent_id", agent_id), ("run_id", run_id)] + clauses = [{name: value} for name, value in id_pairs if value is not None] + filters = {"AND": clauses} if clauses else {} + + # Get all memories for this user + memories = memory_client.get_all(filters=filters, version="v2", output_format="v1.1") + + if not memories or len(memories) == 0: + logger.debug(f"!!! No memories found for this user. {memories}") + return "Hello! It's nice to meet you. How can I help you today?" + + # Create a personalized greeting based on memories + greeting = "Hello! It's great to see you again. " + + # Add some personalization based on memories (limit to 3 memories for brevity) + if len(memories) > 0: + greeting += "Based on our previous conversations, I remember: " + for i, memory in enumerate(memories["results"][:3], 1): + memory_content = memory.get("memory", "") + # Keep memory references brief + if len(memory_content) > 100: + memory_content = memory_content[:97] + "..." + greeting += f"{memory_content} " + + greeting += "How can I help you today?" + + logger.debug(f"Created personalized greeting from {len(memories)} memories") + return greeting + + except Exception as e: + logger.error(f"Error retrieving initial memories from Mem0: {e}") + return "Hello! How can I help you today?" + async def fetch_weather_from_api(params: FunctionCallParams): temperature = 75 if params.arguments["format"] == "fahrenheit" else 24 @@ -134,8 +222,62 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + # Note: You can pass the user_id as a parameter in API call + USER_ID = "pipecat-realtime-user" + logger.info(f"Starting bot") + # ===================================================================== + # OPTION 1: Using Mem0 API (cloud-based approach) + # This approach uses Mem0's cloud service for memory management + # Requires: MEM0_API_KEY set in your environment + # ===================================================================== + memory = Mem0MemoryService( + api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key + user_id=USER_ID, # Unique identifier for the user + agent_id="realtime-agent", # Optional identifier for the agent + run_id="realtime-session", # Optional identifier for the run + params=Mem0MemoryService.InputParams( + search_limit=10, + search_threshold=0.3, + api_version="v2", + system_prompt="Based on previous conversations, I recall: \n\n", + add_as_system_message=True, + position=1, + ), + ) + + # ===================================================================== + # OPTION 2: Using Mem0 with local configuration (self-hosted approach) + # This approach uses a local LLM configuration for memory management + # Requires: Anthropic API key if using Claude model + # ===================================================================== + # Uncomment the following code and comment out the previous memory initialization to use local config + + # local_config = { + # "llm": { + # "provider": "anthropic", + # "config": { + # "model": "claude-3-5-sonnet-20240620", + # "api_key": os.getenv("ANTHROPIC_API_KEY"), # Make sure to set this in your .env + # } + # }, + # "embedder": { + # "provider": "openai", + # "config": { + # "model": "text-embedding-3-large" + # } + # } + # } + + # # Initialize Mem0 memory service with local configuration + # memory = Mem0MemoryService( + # local_config=local_config, # Use local LLM for memory processing + # user_id=USER_ID, # Unique identifier for the user + # # agent_id="realtime-agent", # Optional identifier for the agent + # # run_id="realtime-session", # Optional identifier for the run + # ) + session_properties = SessionProperties( audio=AudioConfiguration( input=AudioInput( @@ -149,7 +291,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) ), # tools=tools, - instructions="""You are a helpful and friendly AI. + instructions="""You are a helpful and friendly AI with memory capabilities. Act like a human, but remember that you aren't a human and that you can't do human things in the real world. Your voice and personality should be warm and engaging, with a lively and @@ -162,6 +304,9 @@ even if you're asked about them. You are participating in a voice conversation. Keep your responses concise, short, and to the point unless specifically asked to elaborate on a topic. +You can remember things about the person you are talking to. If the user asks you to remember +something, make sure to remember it. Greet the user by their name if you know about it. + Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""", ) @@ -182,8 +327,9 @@ Remember, your responses should be short. Just one or two sentences, usually. Re # Create a standard OpenAI LLM context object using the normal messages format. The # OpenAIRealtimeLLMService will convert this internally to messages that the # openai WebSocket API can understand. + # We'll add the initial greeting message after getting memories context = LLMContext( - [{"role": "user", "content": "Say hello!"}], + [], tools, ) @@ -194,6 +340,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re transport.input(), # Transport user input context_aggregator.user(), transcript.user(), # LLM pushes TranscriptionFrames upstream + memory, # Mem0 memory service llm, # LLM transport.output(), # Transport bot output transcript.assistant(), # After the transcript output, to time with the audio output @@ -214,6 +361,18 @@ Remember, your responses should be short. Just one or two sentences, usually. Re @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected") + + # Get personalized greeting based on user memories + greeting = await get_initial_greeting( + memory_client=memory.memory_client, + user_id=USER_ID, + agent_id="realtime-agent", + run_id="realtime-session", + ) + + # Add the greeting as a user message to start the conversation + context.add_message({"role": "user", "content": greeting}) + # Kick off the conversation. await task.queue_frames([LLMRunFrame()])