Compare commits
1 Commits
v0.0.104
...
hush/realt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6101ff9661 |
@@ -4,10 +4,39 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""OpenAI Realtime API Example with Mem0 Memory Integration.
|
||||
|
||||
This example demonstrates how to use OpenAI's Realtime API with Pipecat
|
||||
for conversational AI with memory capabilities using Mem0.
|
||||
|
||||
The example:
|
||||
1. Sets up a real-time audio conversation using OpenAI's Realtime API
|
||||
2. Uses Mem0 to store and retrieve memories from conversations
|
||||
3. Creates personalized greetings based on previous interactions
|
||||
4. Demonstrates function calling capabilities
|
||||
5. Shows how to add tools dynamically at runtime
|
||||
|
||||
Example usage (run from pipecat root directory):
|
||||
$ pip install "pipecat-ai[daily,openai,mem0]"
|
||||
$ python examples/foundational/19-openai-realtime.py
|
||||
|
||||
Requirements:
|
||||
- OpenAI API key (for Realtime API)
|
||||
- Daily API key (for video/audio transport)
|
||||
- Mem0 API key (for cloud-based memory storage)
|
||||
- [Optional] Deepgram API key (for STT fallback)
|
||||
|
||||
Environment variables (set in .env or in your terminal using `export`):
|
||||
DAILY_SAMPLE_ROOM_URL=daily_sample_room_url
|
||||
DAILY_API_KEY=daily_api_key
|
||||
OPENAI_API_KEY=openai_api_key
|
||||
MEM0_API_KEY=mem0_api_key
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Union
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
@@ -27,6 +56,7 @@ from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.mem0.memory import Mem0MemoryService
|
||||
from pipecat.services.openai.realtime.events import (
|
||||
AudioConfiguration,
|
||||
AudioInput,
|
||||
@@ -42,6 +72,64 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
try:
|
||||
from mem0 import Memory, MemoryClient # noqa: F401
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
async def get_initial_greeting(
|
||||
memory_client: Union[MemoryClient, Memory], user_id: str, agent_id: str, run_id: str
|
||||
) -> str:
|
||||
"""Fetch all memories for the user and create a personalized greeting.
|
||||
|
||||
Returns:
|
||||
A personalized greeting based on user memories
|
||||
"""
|
||||
try:
|
||||
if isinstance(memory_client, Memory):
|
||||
filters = {"user_id": user_id, "agent_id": agent_id, "run_id": run_id}
|
||||
filters = {k: v for k, v in filters.items() if v is not None}
|
||||
memories = memory_client.get_all(**filters)
|
||||
else:
|
||||
# Create filters based on available IDs
|
||||
id_pairs = [("user_id", user_id), ("agent_id", agent_id), ("run_id", run_id)]
|
||||
clauses = [{name: value} for name, value in id_pairs if value is not None]
|
||||
filters = {"AND": clauses} if clauses else {}
|
||||
|
||||
# Get all memories for this user
|
||||
memories = memory_client.get_all(filters=filters, version="v2", output_format="v1.1")
|
||||
|
||||
if not memories or len(memories) == 0:
|
||||
logger.debug(f"!!! No memories found for this user. {memories}")
|
||||
return "Hello! It's nice to meet you. How can I help you today?"
|
||||
|
||||
# Create a personalized greeting based on memories
|
||||
greeting = "Hello! It's great to see you again. "
|
||||
|
||||
# Add some personalization based on memories (limit to 3 memories for brevity)
|
||||
if len(memories) > 0:
|
||||
greeting += "Based on our previous conversations, I remember: "
|
||||
for i, memory in enumerate(memories["results"][:3], 1):
|
||||
memory_content = memory.get("memory", "")
|
||||
# Keep memory references brief
|
||||
if len(memory_content) > 100:
|
||||
memory_content = memory_content[:97] + "..."
|
||||
greeting += f"{memory_content} "
|
||||
|
||||
greeting += "How can I help you today?"
|
||||
|
||||
logger.debug(f"Created personalized greeting from {len(memories)} memories")
|
||||
return greeting
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving initial memories from Mem0: {e}")
|
||||
return "Hello! How can I help you today?"
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
|
||||
@@ -134,8 +222,62 @@ transport_params = {
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Note: You can pass the user_id as a parameter in API call
|
||||
USER_ID = "pipecat-realtime-user"
|
||||
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# =====================================================================
|
||||
# OPTION 1: Using Mem0 API (cloud-based approach)
|
||||
# This approach uses Mem0's cloud service for memory management
|
||||
# Requires: MEM0_API_KEY set in your environment
|
||||
# =====================================================================
|
||||
memory = Mem0MemoryService(
|
||||
api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key
|
||||
user_id=USER_ID, # Unique identifier for the user
|
||||
agent_id="realtime-agent", # Optional identifier for the agent
|
||||
run_id="realtime-session", # Optional identifier for the run
|
||||
params=Mem0MemoryService.InputParams(
|
||||
search_limit=10,
|
||||
search_threshold=0.3,
|
||||
api_version="v2",
|
||||
system_prompt="Based on previous conversations, I recall: \n\n",
|
||||
add_as_system_message=True,
|
||||
position=1,
|
||||
),
|
||||
)
|
||||
|
||||
# =====================================================================
|
||||
# OPTION 2: Using Mem0 with local configuration (self-hosted approach)
|
||||
# This approach uses a local LLM configuration for memory management
|
||||
# Requires: Anthropic API key if using Claude model
|
||||
# =====================================================================
|
||||
# Uncomment the following code and comment out the previous memory initialization to use local config
|
||||
|
||||
# local_config = {
|
||||
# "llm": {
|
||||
# "provider": "anthropic",
|
||||
# "config": {
|
||||
# "model": "claude-3-5-sonnet-20240620",
|
||||
# "api_key": os.getenv("ANTHROPIC_API_KEY"), # Make sure to set this in your .env
|
||||
# }
|
||||
# },
|
||||
# "embedder": {
|
||||
# "provider": "openai",
|
||||
# "config": {
|
||||
# "model": "text-embedding-3-large"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
||||
# # Initialize Mem0 memory service with local configuration
|
||||
# memory = Mem0MemoryService(
|
||||
# local_config=local_config, # Use local LLM for memory processing
|
||||
# user_id=USER_ID, # Unique identifier for the user
|
||||
# # agent_id="realtime-agent", # Optional identifier for the agent
|
||||
# # run_id="realtime-session", # Optional identifier for the run
|
||||
# )
|
||||
|
||||
session_properties = SessionProperties(
|
||||
audio=AudioConfiguration(
|
||||
input=AudioInput(
|
||||
@@ -149,7 +291,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
)
|
||||
),
|
||||
# tools=tools,
|
||||
instructions="""You are a helpful and friendly AI.
|
||||
instructions="""You are a helpful and friendly AI with memory capabilities.
|
||||
|
||||
Act like a human, but remember that you aren't a human and that you can't do human
|
||||
things in the real world. Your voice and personality should be warm and engaging, with a lively and
|
||||
@@ -162,6 +304,9 @@ even if you're asked about them.
|
||||
You are participating in a voice conversation. Keep your responses concise, short, and to the point
|
||||
unless specifically asked to elaborate on a topic.
|
||||
|
||||
You can remember things about the person you are talking to. If the user asks you to remember
|
||||
something, make sure to remember it. Greet the user by their name if you know about it.
|
||||
|
||||
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
|
||||
)
|
||||
|
||||
@@ -182,8 +327,9 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
|
||||
# Create a standard OpenAI LLM context object using the normal messages format. The
|
||||
# OpenAIRealtimeLLMService will convert this internally to messages that the
|
||||
# openai WebSocket API can understand.
|
||||
# We'll add the initial greeting message after getting memories
|
||||
context = LLMContext(
|
||||
[{"role": "user", "content": "Say hello!"}],
|
||||
[],
|
||||
tools,
|
||||
)
|
||||
|
||||
@@ -194,6 +340,7 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(),
|
||||
transcript.user(), # LLM pushes TranscriptionFrames upstream
|
||||
memory, # Mem0 memory service
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
transcript.assistant(), # After the transcript output, to time with the audio output
|
||||
@@ -214,6 +361,18 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
|
||||
# Get personalized greeting based on user memories
|
||||
greeting = await get_initial_greeting(
|
||||
memory_client=memory.memory_client,
|
||||
user_id=USER_ID,
|
||||
agent_id="realtime-agent",
|
||||
run_id="realtime-session",
|
||||
)
|
||||
|
||||
# Add the greeting as a user message to start the conversation
|
||||
context.add_message({"role": "user", "content": greeting})
|
||||
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user