diff --git a/examples/mcp/mcp-multiple-mcp.py b/examples/mcp/mcp-multiple-mcp.py index 08a83edcf..cea593626 100644 --- a/examples/mcp/mcp-multiple-mcp.py +++ b/examples/mcp/mcp-multiple-mcp.py @@ -5,27 +5,17 @@ # -import asyncio -import io -import json import os import shutil -import aiohttp from dotenv import load_dotenv from loguru import logger from mcp import StdioServerParameters from mcp.client.session_group import StreamableHttpParameters -from PIL import Image from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import ( - Frame, - FunctionCallResultFrame, - LLMRunFrame, - URLImageRawFrame, -) +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -34,7 +24,6 @@ from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.anthropic.llm import AnthropicLLMService @@ -47,66 +36,16 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) -class UrlToImageProcessor(FrameProcessor): - def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): - super().__init__(**kwargs) - self._aiohttp_session = aiohttp_session - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, FunctionCallResultFrame): - await self.push_frame(frame, direction) - image_url = self.extract_url(frame.result) - if image_url: - await self.run_image_process(image_url) - # sometimes we get multiple image urls- process 1 at a time - await asyncio.sleep(1) - else: - await self.push_frame(frame, direction) - - def extract_url(self, text: str): - try: - data = json.loads(text) - if "artObject" in data: - return data["artObject"]["webImage"]["url"] - if "artworks" in data and len(data["artworks"]): - return data["artworks"][0]["webImage"]["url"] - except (json.JSONDecodeError, KeyError, TypeError): - pass - - async def run_image_process(self, image_url: str): - try: - logger.debug(f"handling image from url: '{image_url}'") - async with self._aiohttp_session.get(image_url) as response: - image_stream = io.BytesIO(await response.content.read()) - image = Image.open(image_stream) - image = image.convert("RGB") - frame = URLImageRawFrame( - url=image_url, image=image.tobytes(), size=image.size, format="RGB" - ) - await self.push_frame(frame) - except Exception as e: - error_msg = f"Error handling image url {image_url}: {str(e)}" - logger.error(error_msg) - - # We use lambdas to defer transport parameter creation until the transport # type is selected at runtime. transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, ), } @@ -114,109 +53,108 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - # Create an HTTP session for API calls - async with aiohttp.ClientSession() as session: - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - settings=CartesiaTTSService.Settings( - voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + system_prompt = f""" + You are a helpful LLM in a voice call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to memory tools that let you store and recall information, + and tools to answer questions about the user's GitHub repositories and account. + Offer to remember things for the user, like their name, preferences, or anything they'd like. + You can also recall things you've previously stored. + You can also offer to answer users questions about their GitHub repositories and account. + Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), + settings=AnthropicLLMService.Settings( + system_instruction=system_prompt, + ), + ) + + async with ( + # https://github.com/modelcontextprotocol/servers/tree/main/src/memory + MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@modelcontextprotocol/server-memory"], + # env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH ), + ) as memory_mcp, + # Github MCP docs: https://github.com/github/github-mcp-server + # Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot) + # Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens) + # Set permissions you want to use (eg. "all repositories", "profile: read/write", etc) + MCPClient( + server_params=StreamableHttpParameters( + url="https://api.githubcopilot.com/mcp/", + headers={ + "Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}" + }, + ), + ) as github_mcp, + ): + memory_tools = await memory_mcp.register_tools(llm) + github_tools = await github_mcp.register_tools(llm) + + all_standard_tools = memory_tools.standard_tools + github_tools.standard_tools + all_tools = ToolsSchema(standard_tools=all_standard_tools) + + context = LLMContext( + messages=[{"role": "user", "content": "Please introduce yourself."}], + tools=all_tools, + ) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), ) - system_prompt = f""" - You are a helpful LLM in a voice call. - Your goal is to demonstrate your capabilities in a succinct way. - You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account. - Offer, for example, to show a floral still life, use the `search_artwork` tool. - The tool may respond with a JSON object with an `artworks` array. Choose the art from that array. - Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool. - You can also offer to answer users questions about their GitHub repositories and account. - Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. - Respond to what the user said in a creative and helpful way. - Don't overexplain what you are doing. - Just respond with short sentences when you are carrying out tool calls. - """ - - llm = AnthropicLLMService( - api_key=os.getenv("ANTHROPIC_API_KEY"), - settings=AnthropicLLMService.Settings( - system_instruction=system_prompt, - ), + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses and tool context + ] ) - async with ( - MCPClient( - server_params=StdioServerParameters( - command=shutil.which("npx"), - # https://github.com/r-huijts/rijksmuseum-mcp - args=["-y", "mcp-server-rijksmuseum"], - env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")}, - ) - ) as rijksmuseum_mcp, - # Github MCP docs: https://github.com/github/github-mcp-server - # Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot) - # Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens) - # Set permissions you want to use (eg. "all repositories", "profile: read/write", etc) - MCPClient( - server_params=StreamableHttpParameters( - url="https://api.githubcopilot.com/mcp/", - headers={ - "Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}" - }, - ), - ) as github_mcp, - ): - rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm) - github_tools = await github_mcp.register_tools(llm) + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) - all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools - all_tools = ToolsSchema(standard_tools=all_standard_tools) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) - context = LLMContext(tools=all_tools) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair( - context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), - ) - mcp_image_processor = UrlToImageProcessor(aiohttp_session=session) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, - user_aggregator, # User spoken responses - llm, # LLM - tts, # TTS - mcp_image_processor, # URL image -> output - transport.output(), # Transport bot output - assistant_aggregator, # Assistant spoken responses and tool context - ] - ) + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) - task = PipelineTask( - pipeline, - params=PipelineParams( - enable_metrics=True, - enable_usage_metrics=True, - ), - idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - ) - - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([LLMRunFrame()]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - await task.cancel() - - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) - - await runner.run(task) + await runner.run(task) async def bot(runner_args: RunnerArguments): @@ -226,9 +164,9 @@ async def bot(runner_args: RunnerArguments): if __name__ == "__main__": - if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"): + if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"): logger.error( - f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp." + f"Please set `GITHUB_PERSONAL_ACCESS_TOKEN` environment variable." ) import sys diff --git a/examples/mcp/mcp-stdio.py b/examples/mcp/mcp-stdio.py index bd3ab3df2..daae45c0c 100644 --- a/examples/mcp/mcp-stdio.py +++ b/examples/mcp/mcp-stdio.py @@ -4,26 +4,15 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import asyncio -import io -import json import os -import re import shutil -import aiohttp from dotenv import load_dotenv from loguru import logger from mcp import StdioServerParameters -from PIL import Image from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import ( - Frame, - FunctionCallResultFrame, - LLMRunFrame, - URLImageRawFrame, -) +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -32,7 +21,6 @@ from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import create_transport from pipecat.services.anthropic.llm import AnthropicLLMService @@ -44,86 +32,16 @@ from pipecat.transports.daily.transport import DailyParams load_dotenv(override=True) - -class UrlToImageProcessor(FrameProcessor): - def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): - super().__init__(**kwargs) - self._aiohttp_session = aiohttp_session - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, FunctionCallResultFrame): - await self.push_frame(frame, direction) - image_url = self.extract_url(frame.result) - if image_url: - await self.run_image_process(image_url) - # sometimes we get multiple image urls- process 1 at a time - await asyncio.sleep(1) - else: - await self.push_frame(frame, direction) - - def extract_url(self, text: str): - try: - data = json.loads(text) - if "artObject" in data: - return data["artObject"]["webImage"]["url"] - if "artworks" in data and len(data["artworks"]): - return data["artworks"][0]["webImage"]["url"] - except (json.JSONDecodeError, KeyError, TypeError): - pass - - return None - - async def run_image_process(self, image_url: str): - try: - logger.debug(f"handling image from url: '{image_url}'") - async with self._aiohttp_session.get(image_url) as response: - image_stream = io.BytesIO(await response.content.read()) - image = Image.open(image_stream) - image = image.convert("RGB") - frame = URLImageRawFrame( - url=image_url, image=image.tobytes(), size=image.size, format="RGB" - ) - await self.push_frame(frame) - except Exception as e: - error_msg = f"Error handling image url {image_url}: {str(e)}" - logger.error(error_msg) - - -# full list of tools available from rijksmuseum MCP: -# - get_artwork_details -# - get_artwork_image -# - get_user_sets -# - get_user_set_details -# - open_image_in_browser -# - get_artist_timeline - -mcp_tools_filter = ["get_artwork_details", "get_artwork_image", "open_image_in_browser"] - - -def open_image_output_filter(output: str): - pattern = r"Successfully opened image in browser: " - text_to_print = re.sub(pattern, "", output) - print(f"🖼️ link to high resolution artwork: {text_to_print}") - - # We use lambdas to defer transport parameter creation until the transport # type is selected at runtime. transport_params = { "daily": lambda: DailyParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, ), "webrtc": lambda: TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, ), } @@ -131,94 +49,88 @@ transport_params = { async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - # Create an HTTP session for API calls - async with aiohttp.ClientSession() as session: - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - settings=CartesiaTTSService.Settings( - voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ), + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + settings=CartesiaTTSService.Settings( + voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ), + ) + + system_prompt = f""" + You are a helpful LLM in a voice call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to memory tools that let you store and recall information. + Offer to remember things for the user, like their name, preferences, or anything they'd like. + You can also recall things you've previously stored. + Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), + settings=AnthropicLLMService.Settings( + system_instruction=system_prompt, + ), + ) + + # https://github.com/modelcontextprotocol/servers/tree/main/src/memory + async with MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@modelcontextprotocol/server-memory"], + # env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH + ), + ) as mcp: + tools = await mcp.register_tools(llm) + + context = LLMContext( + messages=[{"role": "user", "content": "Please introduce yourself."}], + tools=tools, + ) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), ) - system_prompt = f""" - You are a helpful LLM in a voice call. - Your goal is to demonstrate your capabilities in a succinct way. - You have access to tools to search the Rijksmuseum collection. - Offer, for example, to show a floral still life, use the `search_artwork` tool. - The tool may respond with a JSON object with an `artworks` array. Choose the art from that array. - Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool. - Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. - Respond to what the user said in a creative and helpful way. - Don't overexplain what you are doing. - Just respond with short sentences when you are carrying out tool calls. - """ - - llm = AnthropicLLMService( - api_key=os.getenv("ANTHROPIC_API_KEY"), - settings=AnthropicLLMService.Settings( - system_instruction=system_prompt, - ), + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses and tool context + ] ) - mcp_image = UrlToImageProcessor(aiohttp_session=session) - - async with MCPClient( - server_params=StdioServerParameters( - command=shutil.which("npx"), - # https://github.com/r-huijts/rijksmuseum-mcp - args=["-y", "mcp-server-rijksmuseum"], - env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")}, + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, ), - # Optional - tools_filter=mcp_tools_filter, # Optional - tools_output_filters={"open_image_in_browser": open_image_output_filter}, - ) as mcp: - tools = await mcp.register_tools(llm) + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) - context = LLMContext(tools=tools) - user_aggregator, assistant_aggregator = LLMContextAggregatorPair( - context, - user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), - ) + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([LLMRunFrame()]) - pipeline = Pipeline( - [ - transport.input(), # Transport user input - stt, - user_aggregator, # User spoken responses - llm, # LLM - tts, # TTS - mcp_image, # URL image -> output - transport.output(), # Transport bot output - assistant_aggregator, # Assistant spoken responses and tool context - ] - ) + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() - task = PipelineTask( - pipeline, - params=PipelineParams( - enable_metrics=True, - enable_usage_metrics=True, - ), - idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, - ) + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected: {client}") - # Kick off the conversation. - await task.queue_frames([LLMRunFrame()]) - - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - await task.cancel() - - runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) - - await runner.run(task) + await runner.run(task) async def bot(runner_args: RunnerArguments): @@ -228,13 +140,6 @@ async def bot(runner_args: RunnerArguments): if __name__ == "__main__": - if not os.getenv("RIJKSMUSEUM_API_KEY"): - logger.error( - f"Please set RIJKSMUSEUM_API_KEY environment variable for this example. See https://github.com/r-huijts/rijksmuseum-mcp and https://www.rijksmuseum.nl/en/register?redirectUrl=https://www.https://www.rijksmuseum.nl/en/rijksstudio/my/profile" - ) - import sys - - sys.exit(1) from pipecat.runner.run import main main() diff --git a/examples/mcp/mcp-streamable-http.py b/examples/mcp/mcp-streamable-http.py index 461b6b7fc..59860d307 100644 --- a/examples/mcp/mcp-streamable-http.py +++ b/examples/mcp/mcp-streamable-http.py @@ -63,18 +63,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ), ) - system_prompt = f""" - You are a helpful LLM in a voice call. - Your goal is to answer questions about the user's GitHub repositories and account. - You have access to a number of tools provided by Github. Use any and all tools to help users. - Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. - Don't overexplain what you are doing. - Just respond with short sentences when you are carrying out tool calls. - """ + system_prompt = """\ +You are a helpful LLM in a voice call. +Your goal is to answer questions about the user's GitHub repositories and account. +You have access to a number of tools provided by Github. Use any and all tools to help users. +Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. +Don't overexplain what you are doing. +Just respond with short sentences when you are carrying out tool calls. +""" llm = GoogleLLMService( api_key=os.getenv("GOOGLE_API_KEY"), - system_instruction=system_prompt, + settings=GoogleLLMService.Settings( + system_instruction=system_prompt, + ), ) # Github MCP docs: https://github.com/github/github-mcp-server @@ -89,7 +91,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) as mcp: tools = await mcp.register_tools(llm) - context = LLMContext(tools=tools) + context = LLMContext( + messages=[{"role": "user", "content": "Please introduce yourself."}], + tools=tools, + ) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),