Remove `examples/` from the `pyrightconfig.json` ignore list and fix
the resulting type errors across all example files. Common fixes:
- Required API keys: `os.getenv("X")` -> `os.environ["X"]` so the
return type is `str` rather than `str | None`, and misconfiguration
fails fast.
- Narrow `LLMContextMessage` union members with `isinstance(..., dict)`
before dict-style access.
- `assert isinstance(params.llm, ...)` before calling service-specific
methods that aren't on the base `LLMService`.
- Guard optional frame fields (e.g. `LLMSearchResponseFrame.search_result`)
before use.
271 lines
9.8 KiB
Python
271 lines
9.8 KiB
Python
#
|
|
# Copyright (c) 2024-2026, Daily
|
|
#
|
|
# SPDX-License-Identifier: BSD 2-Clause License
|
|
#
|
|
|
|
"""Mem0 Personalized Voice Agent Example with Pipecat.
|
|
|
|
This example demonstrates how to create a conversational AI assistant with memory capabilities
|
|
using Mem0 integration. It shows how to build an agent that remembers previous interactions
|
|
and personalizes responses based on conversation history.
|
|
|
|
The example:
|
|
1. Sets up a video/audio conversation between a user and an AI assistant
|
|
2. Uses Mem0 to store and retrieve memories from conversations
|
|
3. Creates personalized greetings based on previous interactions
|
|
4. Handles multi-modal interaction through audio
|
|
5. Demonstrates two approaches for memory management:
|
|
- Using Mem0 API (cloud-based memory storage)
|
|
- Using local configuration with custom LLM (self-hosted memory)
|
|
|
|
Requirements:
|
|
- OpenAI API key
|
|
- ElevenLabs API key (for text-to-speech)
|
|
- Daily API key (for video/audio transport)
|
|
- Mem0 API key (for cloud-based memory storage)
|
|
- [Optional] Anthropic API key (if using Claude with local config)
|
|
|
|
Environment variables (set in .env or in your terminal using `export`):
|
|
DAILY_ROOM_URL=daily_room_url
|
|
DAILY_API_KEY=daily_api_key
|
|
OPENAI_API_KEY=openai_api_key
|
|
ELEVENLABS_API_KEY=elevenlabs_api_key
|
|
MEM0_API_KEY=mem0_api_key
|
|
ANTHROPIC_API_KEY=anthropic_api_key (if using Claude with local config)
|
|
|
|
The bot runs as part of a pipeline that processes audio frames and manages the conversation flow.
|
|
"""
|
|
|
|
import os
|
|
|
|
from dotenv import load_dotenv
|
|
from loguru import logger
|
|
|
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
|
from pipecat.frames.frames import LLMRunFrame
|
|
from pipecat.pipeline.pipeline import Pipeline
|
|
from pipecat.pipeline.runner import PipelineRunner
|
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
|
from pipecat.processors.aggregators.llm_context import LLMContext
|
|
from pipecat.processors.aggregators.llm_response_universal import (
|
|
LLMContextAggregatorPair,
|
|
LLMUserAggregatorParams,
|
|
)
|
|
from pipecat.runner.types import RunnerArguments
|
|
from pipecat.runner.utils import create_transport
|
|
from pipecat.services.deepgram.stt import DeepgramSTTService
|
|
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
|
|
from pipecat.services.mem0.memory import Mem0MemoryService
|
|
from pipecat.services.openai.llm import OpenAILLMService
|
|
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
|
from pipecat.transports.daily.transport import DailyParams
|
|
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
|
|
|
load_dotenv(override=True)
|
|
|
|
|
|
async def get_initial_greeting(memory_service: Mem0MemoryService) -> str:
|
|
"""Fetch all memories for the user and create a personalized greeting.
|
|
|
|
Args:
|
|
memory_service: The Mem0 memory service instance.
|
|
|
|
Returns:
|
|
A personalized greeting based on user memories.
|
|
"""
|
|
try:
|
|
results = await memory_service.get_memories()
|
|
if not results:
|
|
logger.debug("No memories found for this user.")
|
|
return "Hello! It's nice to meet you. How can I help you today?"
|
|
|
|
# Create a personalized greeting based on memories
|
|
greeting = "Hello! It's great to see you again. "
|
|
greeting += "Based on our previous conversations, I remember: "
|
|
for i, memory in enumerate(results[:3], 1):
|
|
memory_content = memory.get("memory", "")
|
|
# Keep memory references brief
|
|
if len(memory_content) > 100:
|
|
memory_content = memory_content[:97] + "..."
|
|
greeting += f"{memory_content} "
|
|
|
|
greeting += "How can I help you today?"
|
|
|
|
logger.debug(f"Created personalized greeting from {len(results)} memories")
|
|
return greeting
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving initial memories from Mem0: {e}")
|
|
return "Hello! How can I help you today?"
|
|
|
|
|
|
# We use lambdas to defer transport parameter creation until the transport
|
|
# type is selected at runtime.
|
|
transport_params = {
|
|
"daily": lambda: DailyParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
),
|
|
"twilio": lambda: FastAPIWebsocketParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
),
|
|
"webrtc": lambda: TransportParams(
|
|
audio_in_enabled=True,
|
|
audio_out_enabled=True,
|
|
),
|
|
}
|
|
|
|
|
|
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
|
"""Main bot execution function.
|
|
|
|
Sets up and runs the bot pipeline including:
|
|
- Daily video transport
|
|
- Speech-to-text and text-to-speech services
|
|
- Language model integration
|
|
- Mem0 memory service (using either API or local configuration)
|
|
- RTVI event handling
|
|
"""
|
|
# Note: You can pass the user_id as a parameter in API call
|
|
USER_ID = "pipecat-demo-user"
|
|
|
|
logger.info(f"Starting bot")
|
|
|
|
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
|
|
|
|
# Initialize text-to-speech service
|
|
tts = ElevenLabsTTSService(
|
|
api_key=os.environ["ELEVENLABS_API_KEY"],
|
|
settings=ElevenLabsTTSService.Settings(
|
|
voice="pNInz6obpgDQGcFmaJgB",
|
|
),
|
|
)
|
|
|
|
# =====================================================================
|
|
# OPTION 1: Using Mem0 API (cloud-based approach)
|
|
# This approach uses Mem0's cloud service for memory management
|
|
# Requires: MEM0_API_KEY set in your environment
|
|
# =====================================================================
|
|
memory = Mem0MemoryService(
|
|
api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key
|
|
user_id=USER_ID, # Unique identifier for the user
|
|
agent_id="agent1", # Optional identifier for the agent
|
|
run_id="session1", # Optional identifier for the run
|
|
params=Mem0MemoryService.InputParams(
|
|
search_limit=10,
|
|
search_threshold=0.3,
|
|
api_version="v2",
|
|
system_prompt="Based on previous conversations, I recall: \n\n",
|
|
add_as_system_message=True,
|
|
position=1,
|
|
),
|
|
)
|
|
|
|
# =====================================================================
|
|
# OPTION 2: Using Mem0 with local configuration (self-hosted approach)
|
|
# This approach uses a local LLM configuration for memory management
|
|
# Requires: Anthropic API key if using Claude model
|
|
# =====================================================================
|
|
# Uncomment the following code and comment out the previous memory initialization to use local config
|
|
|
|
# local_config = {
|
|
# "llm": {
|
|
# "provider": "anthropic",
|
|
# "config": {
|
|
# "model": "claude-3-5-sonnet-20240620",
|
|
# "api_key": os.getenv("ANTHROPIC_API_KEY"), # Make sure to set this in your .env
|
|
# }
|
|
# },
|
|
# "embedder": {
|
|
# "provider": "openai",
|
|
# "config": {
|
|
# "model": "text-embedding-3-large"
|
|
# }
|
|
# }
|
|
# }
|
|
|
|
# # Initialize Mem0 memory service with local configuration
|
|
# memory = Mem0MemoryService(
|
|
# local_config=local_config, # Use local LLM for memory processing
|
|
# user_id=USER_ID, # Unique identifier for the user
|
|
# # agent_id="agent1", # Optional identifier for the agent
|
|
# # run_id="session1", # Optional identifier for the run
|
|
# )
|
|
|
|
# Initialize LLM service
|
|
llm = OpenAILLMService(
|
|
api_key=os.environ["OPENAI_API_KEY"],
|
|
settings=OpenAILLMService.Settings(
|
|
system_instruction="""You are a personal assistant. You can remember things about the person you are talking to.
|
|
Some Guidelines:
|
|
- Make sure your responses are friendly yet short and concise.
|
|
- If the user asks you to remember something, make sure to remember it.
|
|
- Greet the user by their name if you know about it.
|
|
""",
|
|
),
|
|
)
|
|
|
|
# Set up conversation context and management
|
|
# The context_aggregator will automatically collect conversation context
|
|
context = LLMContext()
|
|
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
|
context,
|
|
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
|
)
|
|
|
|
pipeline = Pipeline(
|
|
[
|
|
transport.input(),
|
|
stt,
|
|
user_aggregator,
|
|
memory,
|
|
llm,
|
|
tts,
|
|
transport.output(),
|
|
assistant_aggregator,
|
|
]
|
|
)
|
|
|
|
task = PipelineTask(
|
|
pipeline,
|
|
params=PipelineParams(
|
|
enable_metrics=True,
|
|
enable_usage_metrics=True,
|
|
),
|
|
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
|
)
|
|
|
|
@transport.event_handler("on_client_connected")
|
|
async def on_client_connected(transport, client):
|
|
logger.info(f"Client connected")
|
|
# Get personalized greeting based on user memories
|
|
greeting = await get_initial_greeting(memory)
|
|
|
|
# Add the greeting as an assistant message to start the conversation
|
|
context.add_message({"role": "developer", "content": greeting})
|
|
|
|
# Queue the context frame to start the conversation
|
|
await task.queue_frames([LLMRunFrame()])
|
|
|
|
@transport.event_handler("on_client_disconnected")
|
|
async def on_client_disconnected(transport, client):
|
|
logger.info(f"Client disconnected")
|
|
await task.cancel()
|
|
|
|
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
|
await runner.run(task)
|
|
|
|
|
|
async def bot(runner_args: RunnerArguments):
|
|
"""Main bot entry point compatible with Pipecat Cloud."""
|
|
transport = await create_transport(runner_args, transport_params)
|
|
await run_bot(transport, runner_args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from pipecat.runner.run import main
|
|
|
|
main()
|