Compare commits

...

1 Commits

Author SHA1 Message Date
vipyne
a4b66aedc1 Update MCP examples 2026-04-02 18:06:28 -05:00
3 changed files with 183 additions and 335 deletions

View File

@@ -5,27 +5,17 @@
#
import asyncio
import io
import json
import os
import shutil
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from mcp.client.session_group import StreamableHttpParameters
from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
LLMRunFrame,
URLImageRawFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -34,7 +24,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
@@ -47,66 +36,16 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UrlToImageProcessor(FrameProcessor):
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
super().__init__(**kwargs)
self._aiohttp_session = aiohttp_session
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, FunctionCallResultFrame):
await self.push_frame(frame, direction)
image_url = self.extract_url(frame.result)
if image_url:
await self.run_image_process(image_url)
# sometimes we get multiple image urls- process 1 at a time
await asyncio.sleep(1)
else:
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except (json.JSONDecodeError, KeyError, TypeError):
pass
async def run_image_process(self, image_url: str):
try:
logger.debug(f"handling image from url: '{image_url}'")
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
image = image.convert("RGB")
frame = URLImageRawFrame(
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
)
await self.push_frame(frame)
except Exception as e:
error_msg = f"Error handling image url {image_url}: {str(e)}"
logger.error(error_msg)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
@@ -114,109 +53,108 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to memory tools that let you store and recall information,
and tools to answer questions about the user's GitHub repositories and account.
Offer to remember things for the user, like their name, preferences, or anything they'd like.
You can also recall things you've previously stored.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
)
async with (
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
args=["-y", "@modelcontextprotocol/server-memory"],
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
),
) as memory_mcp,
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
},
),
) as github_mcp,
):
memory_tools = await memory_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
all_standard_tools = memory_tools.standard_tools + github_tools.standard_tools
all_tools = ToolsSchema(standard_tools=all_standard_tools)
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=all_tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
async with (
MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
)
) as rijksmuseum_mcp,
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
},
),
) as github_mcp,
):
rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools
all_tools = ToolsSchema(standard_tools=all_standard_tools)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
context = LLMContext(tools=all_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
mcp_image_processor = UrlToImageProcessor(aiohttp_session=session)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
mcp_image_processor, # URL image -> output
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
@@ -226,9 +164,9 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(
f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp."
f"Please set `GITHUB_PERSONAL_ACCESS_TOKEN` environment variable."
)
import sys

View File

@@ -4,26 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import json
import os
import re
import shutil
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
LLMRunFrame,
URLImageRawFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -32,7 +21,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
@@ -44,86 +32,16 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UrlToImageProcessor(FrameProcessor):
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
super().__init__(**kwargs)
self._aiohttp_session = aiohttp_session
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, FunctionCallResultFrame):
await self.push_frame(frame, direction)
image_url = self.extract_url(frame.result)
if image_url:
await self.run_image_process(image_url)
# sometimes we get multiple image urls- process 1 at a time
await asyncio.sleep(1)
else:
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except (json.JSONDecodeError, KeyError, TypeError):
pass
return None
async def run_image_process(self, image_url: str):
try:
logger.debug(f"handling image from url: '{image_url}'")
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
image = image.convert("RGB")
frame = URLImageRawFrame(
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
)
await self.push_frame(frame)
except Exception as e:
error_msg = f"Error handling image url {image_url}: {str(e)}"
logger.error(error_msg)
# full list of tools available from rijksmuseum MCP:
# - get_artwork_details
# - get_artwork_image
# - get_user_sets
# - get_user_set_details
# - open_image_in_browser
# - get_artist_timeline
mcp_tools_filter = ["get_artwork_details", "get_artwork_image", "open_image_in_browser"]
def open_image_output_filter(output: str):
pattern = r"Successfully opened image in browser: "
text_to_print = re.sub(pattern, "", output)
print(f"🖼️ link to high resolution artwork: {text_to_print}")
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
@@ -131,94 +49,88 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to memory tools that let you store and recall information.
Offer to remember things for the user, like their name, preferences, or anything they'd like.
You can also recall things you've previously stored.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
)
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
async with MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
args=["-y", "@modelcontextprotocol/server-memory"],
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
),
) as mcp:
tools = await mcp.register_tools(llm)
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
mcp_image = UrlToImageProcessor(aiohttp_session=session)
async with MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
# Optional
tools_filter=mcp_tools_filter, # Optional
tools_output_filters={"open_image_in_browser": open_image_output_filter},
) as mcp:
tools = await mcp.register_tools(llm)
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
mcp_image, # URL image -> output
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
@@ -228,13 +140,6 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY"):
logger.error(
f"Please set RIJKSMUSEUM_API_KEY environment variable for this example. See https://github.com/r-huijts/rijksmuseum-mcp and https://www.rijksmuseum.nl/en/register?redirectUrl=https://www.https://www.rijksmuseum.nl/en/rijksstudio/my/profile"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -63,18 +63,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to answer questions about the user's GitHub repositories and account.
You have access to a number of tools provided by Github. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
system_prompt = """\
You are a helpful LLM in a voice call.
Your goal is to answer questions about the user's GitHub repositories and account.
You have access to a number of tools provided by Github. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_prompt,
settings=GoogleLLMService.Settings(
system_instruction=system_prompt,
),
)
# Github MCP docs: https://github.com/github/github-mcp-server
@@ -89,7 +91,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
) as mcp:
tools = await mcp.register_tools(llm)
context = LLMContext(tools=tools)
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),