Integrate Mem0 memory service into OpenAI Realtime API example

- Add Mem0MemoryService to enable conversation memory persistence
- Add get_initial_greeting function to create personalized greetings
- Update pipeline to include memory service between user context and LLM
- Add support for both cloud-based (Mem0 API) and local configurations
- Update system instructions to include memory-related guidance
- Modify on_client_connected handler to fetch and use personalized greetings
- Update documentation with Mem0 setup and usage instructions
This commit is contained in:
James Hush
2025-11-05 14:16:44 +01:00
parent c20aa78648
commit 6101ff9661

View File

@@ -4,10 +4,39 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Realtime API Example with Mem0 Memory Integration.
This example demonstrates how to use OpenAI's Realtime API with Pipecat
for conversational AI with memory capabilities using Mem0.
The example:
1. Sets up a real-time audio conversation using OpenAI's Realtime API
2. Uses Mem0 to store and retrieve memories from conversations
3. Creates personalized greetings based on previous interactions
4. Demonstrates function calling capabilities
5. Shows how to add tools dynamically at runtime
Example usage (run from pipecat root directory):
$ pip install "pipecat-ai[daily,openai,mem0]"
$ python examples/foundational/19-openai-realtime.py
Requirements:
- OpenAI API key (for Realtime API)
- Daily API key (for video/audio transport)
- Mem0 API key (for cloud-based memory storage)
- [Optional] Deepgram API key (for STT fallback)
Environment variables (set in .env or in your terminal using `export`):
DAILY_SAMPLE_ROOM_URL=daily_sample_room_url
DAILY_API_KEY=daily_api_key
OPENAI_API_KEY=openai_api_key
MEM0_API_KEY=mem0_api_key
"""
import asyncio
import os
from datetime import datetime
from typing import Union
from dotenv import load_dotenv
from loguru import logger
@@ -27,6 +56,7 @@ from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.mem0.memory import Mem0MemoryService
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
@@ -42,6 +72,64 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
try:
from mem0 import Memory, MemoryClient # noqa: F401
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY."
)
raise Exception(f"Missing module: {e}")
async def get_initial_greeting(
memory_client: Union[MemoryClient, Memory], user_id: str, agent_id: str, run_id: str
) -> str:
"""Fetch all memories for the user and create a personalized greeting.
Returns:
A personalized greeting based on user memories
"""
try:
if isinstance(memory_client, Memory):
filters = {"user_id": user_id, "agent_id": agent_id, "run_id": run_id}
filters = {k: v for k, v in filters.items() if v is not None}
memories = memory_client.get_all(**filters)
else:
# Create filters based on available IDs
id_pairs = [("user_id", user_id), ("agent_id", agent_id), ("run_id", run_id)]
clauses = [{name: value} for name, value in id_pairs if value is not None]
filters = {"AND": clauses} if clauses else {}
# Get all memories for this user
memories = memory_client.get_all(filters=filters, version="v2", output_format="v1.1")
if not memories or len(memories) == 0:
logger.debug(f"!!! No memories found for this user. {memories}")
return "Hello! It's nice to meet you. How can I help you today?"
# Create a personalized greeting based on memories
greeting = "Hello! It's great to see you again. "
# Add some personalization based on memories (limit to 3 memories for brevity)
if len(memories) > 0:
greeting += "Based on our previous conversations, I remember: "
for i, memory in enumerate(memories["results"][:3], 1):
memory_content = memory.get("memory", "")
# Keep memory references brief
if len(memory_content) > 100:
memory_content = memory_content[:97] + "..."
greeting += f"{memory_content} "
greeting += "How can I help you today?"
logger.debug(f"Created personalized greeting from {len(memories)} memories")
return greeting
except Exception as e:
logger.error(f"Error retrieving initial memories from Mem0: {e}")
return "Hello! How can I help you today?"
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
@@ -134,8 +222,62 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Note: You can pass the user_id as a parameter in API call
USER_ID = "pipecat-realtime-user"
logger.info(f"Starting bot")
# =====================================================================
# OPTION 1: Using Mem0 API (cloud-based approach)
# This approach uses Mem0's cloud service for memory management
# Requires: MEM0_API_KEY set in your environment
# =====================================================================
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key
user_id=USER_ID, # Unique identifier for the user
agent_id="realtime-agent", # Optional identifier for the agent
run_id="realtime-session", # Optional identifier for the run
params=Mem0MemoryService.InputParams(
search_limit=10,
search_threshold=0.3,
api_version="v2",
system_prompt="Based on previous conversations, I recall: \n\n",
add_as_system_message=True,
position=1,
),
)
# =====================================================================
# OPTION 2: Using Mem0 with local configuration (self-hosted approach)
# This approach uses a local LLM configuration for memory management
# Requires: Anthropic API key if using Claude model
# =====================================================================
# Uncomment the following code and comment out the previous memory initialization to use local config
# local_config = {
# "llm": {
# "provider": "anthropic",
# "config": {
# "model": "claude-3-5-sonnet-20240620",
# "api_key": os.getenv("ANTHROPIC_API_KEY"), # Make sure to set this in your .env
# }
# },
# "embedder": {
# "provider": "openai",
# "config": {
# "model": "text-embedding-3-large"
# }
# }
# }
# # Initialize Mem0 memory service with local configuration
# memory = Mem0MemoryService(
# local_config=local_config, # Use local LLM for memory processing
# user_id=USER_ID, # Unique identifier for the user
# # agent_id="realtime-agent", # Optional identifier for the agent
# # run_id="realtime-session", # Optional identifier for the run
# )
session_properties = SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
@@ -149,7 +291,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
),
# tools=tools,
instructions="""You are a helpful and friendly AI.
instructions="""You are a helpful and friendly AI with memory capabilities.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
@@ -162,6 +304,9 @@ even if you're asked about them.
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
You can remember things about the person you are talking to. If the user asks you to remember
something, make sure to remember it. Greet the user by their name if you know about it.
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
)
@@ -182,8 +327,9 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
# We'll add the initial greeting message after getting memories
context = LLMContext(
[{"role": "user", "content": "Say hello!"}],
[],
tools,
)
@@ -194,6 +340,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
transport.input(), # Transport user input
context_aggregator.user(),
transcript.user(), # LLM pushes TranscriptionFrames upstream
memory, # Mem0 memory service
llm, # LLM
transport.output(), # Transport bot output
transcript.assistant(), # After the transcript output, to time with the audio output
@@ -214,6 +361,18 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Get personalized greeting based on user memories
greeting = await get_initial_greeting(
memory_client=memory.memory_client,
user_id=USER_ID,
agent_id="realtime-agent",
run_id="realtime-session",
)
# Add the greeting as a user message to start the conversation
context.add_message({"role": "user", "content": greeting})
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])