Files
pipecat/examples/rag/rag-mem0.py
Mark Backman 58a17c7b1b Include examples in type checking
Remove `examples/` from the `pyrightconfig.json` ignore list and fix
the resulting type errors across all example files. Common fixes:

- Required API keys: `os.getenv("X")` -> `os.environ["X"]` so the
  return type is `str` rather than `str | None`, and misconfiguration
  fails fast.
- Narrow `LLMContextMessage` union members with `isinstance(..., dict)`
  before dict-style access.
- `assert isinstance(params.llm, ...)` before calling service-specific
  methods that aren't on the base `LLMService`.
- Guard optional frame fields (e.g. `LLMSearchResponseFrame.search_result`)
  before use.
2026-04-21 15:43:31 -04:00

271 lines
9.8 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Mem0 Personalized Voice Agent Example with Pipecat.
This example demonstrates how to create a conversational AI assistant with memory capabilities
using Mem0 integration. It shows how to build an agent that remembers previous interactions
and personalizes responses based on conversation history.
The example:
1. Sets up a video/audio conversation between a user and an AI assistant
2. Uses Mem0 to store and retrieve memories from conversations
3. Creates personalized greetings based on previous interactions
4. Handles multi-modal interaction through audio
5. Demonstrates two approaches for memory management:
- Using Mem0 API (cloud-based memory storage)
- Using local configuration with custom LLM (self-hosted memory)
Requirements:
- OpenAI API key
- ElevenLabs API key (for text-to-speech)
- Daily API key (for video/audio transport)
- Mem0 API key (for cloud-based memory storage)
- [Optional] Anthropic API key (if using Claude with local config)
Environment variables (set in .env or in your terminal using `export`):
DAILY_ROOM_URL=daily_room_url
DAILY_API_KEY=daily_api_key
OPENAI_API_KEY=openai_api_key
ELEVENLABS_API_KEY=elevenlabs_api_key
MEM0_API_KEY=mem0_api_key
ANTHROPIC_API_KEY=anthropic_api_key (if using Claude with local config)
The bot runs as part of a pipeline that processes audio frames and manages the conversation flow.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.mem0.memory import Mem0MemoryService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def get_initial_greeting(memory_service: Mem0MemoryService) -> str:
"""Fetch all memories for the user and create a personalized greeting.
Args:
memory_service: The Mem0 memory service instance.
Returns:
A personalized greeting based on user memories.
"""
try:
results = await memory_service.get_memories()
if not results:
logger.debug("No memories found for this user.")
return "Hello! It's nice to meet you. How can I help you today?"
# Create a personalized greeting based on memories
greeting = "Hello! It's great to see you again. "
greeting += "Based on our previous conversations, I remember: "
for i, memory in enumerate(results[:3], 1):
memory_content = memory.get("memory", "")
# Keep memory references brief
if len(memory_content) > 100:
memory_content = memory_content[:97] + "..."
greeting += f"{memory_content} "
greeting += "How can I help you today?"
logger.debug(f"Created personalized greeting from {len(results)} memories")
return greeting
except Exception as e:
logger.error(f"Error retrieving initial memories from Mem0: {e}")
return "Hello! How can I help you today?"
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport
- Speech-to-text and text-to-speech services
- Language model integration
- Mem0 memory service (using either API or local configuration)
- RTVI event handling
"""
# Note: You can pass the user_id as a parameter in API call
USER_ID = "pipecat-demo-user"
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
# Initialize text-to-speech service
tts = ElevenLabsTTSService(
api_key=os.environ["ELEVENLABS_API_KEY"],
settings=ElevenLabsTTSService.Settings(
voice="pNInz6obpgDQGcFmaJgB",
),
)
# =====================================================================
# OPTION 1: Using Mem0 API (cloud-based approach)
# This approach uses Mem0's cloud service for memory management
# Requires: MEM0_API_KEY set in your environment
# =====================================================================
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key
user_id=USER_ID, # Unique identifier for the user
agent_id="agent1", # Optional identifier for the agent
run_id="session1", # Optional identifier for the run
params=Mem0MemoryService.InputParams(
search_limit=10,
search_threshold=0.3,
api_version="v2",
system_prompt="Based on previous conversations, I recall: \n\n",
add_as_system_message=True,
position=1,
),
)
# =====================================================================
# OPTION 2: Using Mem0 with local configuration (self-hosted approach)
# This approach uses a local LLM configuration for memory management
# Requires: Anthropic API key if using Claude model
# =====================================================================
# Uncomment the following code and comment out the previous memory initialization to use local config
# local_config = {
# "llm": {
# "provider": "anthropic",
# "config": {
# "model": "claude-3-5-sonnet-20240620",
# "api_key": os.getenv("ANTHROPIC_API_KEY"), # Make sure to set this in your .env
# }
# },
# "embedder": {
# "provider": "openai",
# "config": {
# "model": "text-embedding-3-large"
# }
# }
# }
# # Initialize Mem0 memory service with local configuration
# memory = Mem0MemoryService(
# local_config=local_config, # Use local LLM for memory processing
# user_id=USER_ID, # Unique identifier for the user
# # agent_id="agent1", # Optional identifier for the agent
# # run_id="session1", # Optional identifier for the run
# )
# Initialize LLM service
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction="""You are a personal assistant. You can remember things about the person you are talking to.
Some Guidelines:
- Make sure your responses are friendly yet short and concise.
- If the user asks you to remember something, make sure to remember it.
- Greet the user by their name if you know about it.
""",
),
)
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
memory,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Get personalized greeting based on user memories
greeting = await get_initial_greeting(memory)
# Add the greeting as an assistant message to start the conversation
context.add_message({"role": "developer", "content": greeting})
# Queue the context frame to start the conversation
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()