From c420dbe57f4bd3fc774958273ad8d9c303b16980 Mon Sep 17 00:00:00 2001 From: vipyne Date: Thu, 10 Apr 2025 12:21:54 -0500 Subject: [PATCH 1/6] MCP Service WIP getting mcp.run to work add mcp[cli] to toml and lint mcp stdio example mcp sse example mcp_run POC ruff formatting --- examples/foundational/39-mcp-stdio.py | 197 +++++++++++++++++++++ examples/foundational/39a-mcp-run-sse.py | 127 ++++++++++++++ pyproject.toml | 1 + src/pipecat/services/mcp_service.py | 213 +++++++++++++++++++++++ 4 files changed, 538 insertions(+) create mode 100644 examples/foundational/39-mcp-stdio.py create mode 100644 examples/foundational/39a-mcp-run-sse.py create mode 100644 src/pipecat/services/mcp_service.py diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py new file mode 100644 index 000000000..7543a4922 --- /dev/null +++ b/examples/foundational/39-mcp-stdio.py @@ -0,0 +1,197 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import io +import os +import re +import shutil +import sys +import time + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from mcp import StdioServerParameters +from PIL import Image + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + +logger.remove() +logger.add(sys.stderr, level="DEBUG") + + +class UrlToImageProcessor(FrameProcessor): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._pipecat_session = kwargs["pipecat_session"] + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, FunctionCallResultFrame): + await self.push_frame(frame, direction) + image_url = self.extract_url(frame.result) + if image_url: + await self.run_image_process(image_url) + # sometimes we get multiple image urls- process 1 at a time + time.sleep(1) + else: + pass + else: + await self.push_frame(frame, direction) + + def extract_url(self, text: str): + pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG))\)" + match = re.search(pattern, text) + if match: + return match.group(1) + return None + + async def run_image_process(self, image_url: str): + try: + logger.debug(f"handling image from url: '{image_url}'") + async with self._pipecat_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + image = image.convert("RGB") + frame = URLImageRawFrame( + url=image_url, image=image.tobytes(), size=image.size, format="RGB" + ) + await self.push_frame(frame) + except Exception as e: + error_msg = f"Error handling image url {image_url}: {str(e)}" + logger.error(error_msg) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + try: + mcp = MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@programcomputer/nasa-mcp-server@latest"], + # https://api.nasa.gov + env={"NASA_API_KEY": os.getenv("NASA_API_KEY")}, + ) + ) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + mcp_image = UrlToImageProcessor(pipecat_session=session) + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by NASA MCP. Use any and all tools to help users. + When asked for the astronomy picture of the day, PASS in NO date to the API. + This ensures we get the latest picture available. If as specific date is asked for, you + can pass in that date to the API. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + mcp_image, # URL image -> output + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py new file mode 100644 index 000000000..e313be247 --- /dev/null +++ b/examples/foundational/39a-mcp-run-sse.py @@ -0,0 +1,127 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + +logger.remove() +logger.add(sys.stderr, level="DEBUG") + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + try: + # https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/ + # ie. "https://www.mcp.run/api/mcp/sse?..." + mcp = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL")) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by mcp.run. Use any and all tools to help users. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index e57a2c5e5..58d2097ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ] lmnt = [ "websockets~=13.1" ] local = [ "pyaudio~=0.2.14" ] +mcp = [ "mcp[cli]~=1.6.0" ] mem0 = [ "mem0ai~=0.1.76" ] mlx-whisper = [ "mlx-whisper~=0.4.2" ] moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ] diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py new file mode 100644 index 000000000..c0775e564 --- /dev/null +++ b/src/pipecat/services/mcp_service.py @@ -0,0 +1,213 @@ +import json +from typing import Any, Dict, List, Optional, Union + +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema + +try: + from mcp import ClientSession, StdioServerParameters, types + from mcp.client.session import ClientSession + from mcp.client.sse import sse_client + from mcp.client.stdio import stdio_client +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.") + raise Exception(f"Missing module: {e}") + + +class MCPClient: + def __init__( + self, + server_params: Union[StdioServerParameters, str] = None, + pipecat_session: Optional[any] = None, + **kwargs, + ): + super().__init__(**kwargs) + self._server_params = server_params + self._session = ClientSession + self._pipecat_session = pipecat_session + if isinstance(server_params, StdioServerParameters): + self._client = stdio_client + self._register_tools = self._stdio_register_tools + elif isinstance(server_params, str): + self._client = sse_client + self._register_tools = self._sse_register_tools + else: + logger.exception( + f"{self} error - server_params must be either StdioServerParameters or an sse server url string." + ) + + async def register_tools(self, llm) -> ToolsSchema: + tools_schema = await self._register_tools(llm) + return tools_schema + + def convert_mcp_schema_to_pipecat( + self, tool_name: str, tool_schema: Dict[str, Any] + ) -> FunctionSchema: + """Convert an mcp tool schema to Pipecat's FunctionSchema format. + Args: + tool_name: The name of the tool + tool_schema: The mcp tool schema + Returns: + A FunctionSchema instance + """ + + logger.debug(f"Converting schema for tool '{tool_name}'") + logger.debug(f"Original schema: {json.dumps(tool_schema, indent=2)}") + + properties = tool_schema["input_schema"].get("properties", {}) + required = tool_schema["input_schema"].get("required", []) + + schema = FunctionSchema( + name=tool_name, + description=tool_schema["description"], + properties=properties, + required=required, + ) + + logger.debug(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") + + return schema + + async def _sse_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp.run tools with the LLM service. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client(self._server_params) as (read, write): + async with self._session(read, write) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp.run tools") + tool_schemas: List[FunctionSchema] = [] + + async with self._client(self._server_params) as (read, write): + async with self._session(read, write) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + + async def _stdio_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp.run tools with the LLM service. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client(self._server_params) as streams: + async with self._session(streams[0], streams[1]) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp.run tools") + + async with self._client(self._server_params) as streams: + async with self._session(streams[0], streams[1]) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + + async def _call_tool(self, session, function_name, arguments, result_callback): + logger.debug(f"Calling mcp tool '{function_name}'") + try: + results = await session.call_tool(function_name, arguments=arguments) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + + response = "" + image_url = None + if hasattr(results, "content") and results.content: + for i, content in enumerate(results.content): + if hasattr(content, "text") and content.text: + logger.debug(f"Tool response chunk {i}: {content.text}") + response += content.text + else: + # logger.debug(f"Non-text result content: '{content}'") + pass + logger.info(f"Tool '{function_name}' completed successfully") + logger.info(f"Final response: {response}") + else: + logger.error(f"Error getting content from {function_name} results.") + + await result_callback(response) + + async def _list_tools(self, session, mcp_tool_wrapper, llm): + available_tools = await session.list_tools() + tool_schemas: List[FunctionSchema] = [] + + try: + logger.debug(f"Found {len(available_tools)} available tools") + except: + pass + + for tool in available_tools.tools: + tool_name = tool.name + logger.debug(f"Processing tool: {tool_name}") + logger.debug(f"Tool description: {tool.description}") + + try: + # Convert the schema + function_schema = self.convert_mcp_schema_to_pipecat( + tool_name, + {"description": tool.description, "input_schema": tool.inputSchema}, + ) + + # Register the wrapped function + logger.debug(f"Registering function handler for '{tool_name}'") + llm.register_function(tool_name, mcp_tool_wrapper) + + # Add to list of schemas + tool_schemas.append(function_schema) + logger.debug(f"Successfully registered tool '{tool_name}'") + + except Exception as e: + logger.error(f"Failed to register tool '{tool_name}': {str(e)}") + logger.exception("Full exception details:") + continue + + logger.info(f"Completed registration of {len(tool_schemas)} tools") + tools_schema = ToolsSchema(standard_tools=tool_schemas) + + return tools_schema From 3384598e070bb014e4e31fbed4e31863ef91d769 Mon Sep 17 00:00:00 2001 From: vipyne Date: Tue, 22 Apr 2025 17:44:25 -0500 Subject: [PATCH 2/6] mcp_service: pr notes --- examples/foundational/39-mcp-stdio.py | 17 ++++++--------- examples/foundational/39a-mcp-run-sse.py | 3 --- src/pipecat/services/mcp_service.py | 27 ++++++++++++------------ 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py index 7543a4922..e32f0c7ff 100644 --- a/examples/foundational/39-mcp-stdio.py +++ b/examples/foundational/39-mcp-stdio.py @@ -4,12 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio import io import os import re import shutil import sys -import time import aiohttp from dotenv import load_dotenv @@ -38,14 +38,11 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) -logger.remove() -logger.add(sys.stderr, level="DEBUG") - class UrlToImageProcessor(FrameProcessor): - def __init__(self, **kwargs): + def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): super().__init__(**kwargs) - self._pipecat_session = kwargs["pipecat_session"] + self._aiohttp_session = aiohttp_session async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -56,9 +53,7 @@ class UrlToImageProcessor(FrameProcessor): if image_url: await self.run_image_process(image_url) # sometimes we get multiple image urls- process 1 at a time - time.sleep(1) - else: - pass + await asyncio.sleep(1) else: await self.push_frame(frame, direction) @@ -72,7 +67,7 @@ class UrlToImageProcessor(FrameProcessor): async def run_image_process(self, image_url: str): try: logger.debug(f"handling image from url: '{image_url}'") - async with self._pipecat_session.get(image_url) as response: + async with self._aiohttp_session.get(image_url) as response: image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) image = image.convert("RGB") @@ -128,7 +123,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): logger.error(f"error setting up mcp") logger.exception("error trace:") - mcp_image = UrlToImageProcessor(pipecat_session=session) + mcp_image = UrlToImageProcessor(aiohttp_session=session) tools = await mcp.register_tools(llm) diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py index e313be247..f73bf0918 100644 --- a/examples/foundational/39a-mcp-run-sse.py +++ b/examples/foundational/39a-mcp-run-sse.py @@ -27,9 +27,6 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) -logger.remove() -logger.add(sys.stderr, level="DEBUG") - async def run_bot(webrtc_connection: SmallWebRTCConnection): logger.info(f"Starting bot") diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index c0775e564..180b5b2c4 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -5,6 +5,7 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.utils.base_object import BaseObject try: from mcp import ClientSession, StdioServerParameters, types @@ -17,17 +18,15 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class MCPClient: +class MCPClient(BaseObject): def __init__( self, - server_params: Union[StdioServerParameters, str] = None, - pipecat_session: Optional[any] = None, + server_params: Union[StdioServerParameters, str], **kwargs, ): super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession - self._pipecat_session = pipecat_session if isinstance(server_params, StdioServerParameters): self._client = stdio_client self._register_tools = self._stdio_register_tools @@ -35,15 +34,15 @@ class MCPClient: self._client = sse_client self._register_tools = self._sse_register_tools else: - logger.exception( - f"{self} error - server_params must be either StdioServerParameters or an sse server url string." + raise TypeError( + f"{self} invalid argument type: `server_params` must be either StdioServerParameters or an SSE server url string." ) async def register_tools(self, llm) -> ToolsSchema: tools_schema = await self._register_tools(llm) return tools_schema - def convert_mcp_schema_to_pipecat( + def _convert_mcp_schema_to_pipecat( self, tool_name: str, tool_schema: Dict[str, Any] ) -> FunctionSchema: """Convert an mcp tool schema to Pipecat's FunctionSchema format. @@ -55,7 +54,7 @@ class MCPClient: """ logger.debug(f"Converting schema for tool '{tool_name}'") - logger.debug(f"Original schema: {json.dumps(tool_schema, indent=2)}") + logger.trace(f"Original schema: {json.dumps(tool_schema, indent=2)}") properties = tool_schema["input_schema"].get("properties", {}) required = tool_schema["input_schema"].get("required", []) @@ -67,7 +66,7 @@ class MCPClient: required=required, ) - logger.debug(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") + logger.trace(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") return schema @@ -89,7 +88,7 @@ class MCPClient: ) -> None: """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") - logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client(self._server_params) as (read, write): async with self._session(read, write) as session: @@ -128,7 +127,7 @@ class MCPClient: ) -> None: """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") - logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client(self._server_params) as streams: async with self._session(streams[0], streams[1]) as session: @@ -167,7 +166,7 @@ class MCPClient: # logger.debug(f"Non-text result content: '{content}'") pass logger.info(f"Tool '{function_name}' completed successfully") - logger.info(f"Final response: {response}") + logger.debug(f"Final response: {response}") else: logger.error(f"Error getting content from {function_name} results.") @@ -189,7 +188,7 @@ class MCPClient: try: # Convert the schema - function_schema = self.convert_mcp_schema_to_pipecat( + function_schema = self._convert_mcp_schema_to_pipecat( tool_name, {"description": tool.description, "input_schema": tool.inputSchema}, ) @@ -207,7 +206,7 @@ class MCPClient: logger.exception("Full exception details:") continue - logger.info(f"Completed registration of {len(tool_schemas)} tools") + logger.debug(f"Completed registration of {len(tool_schemas)} tools") tools_schema = ToolsSchema(standard_tools=tool_schemas) return tools_schema From b7b2a5b7a179a02fe12aa1d28941b8452989fb08 Mon Sep 17 00:00:00 2001 From: vipyne Date: Thu, 24 Apr 2025 17:13:28 -0500 Subject: [PATCH 3/6] mcp service fix and add multiple mcp example --- examples/foundational/39a-mcp-run-sse.py | 2 +- examples/foundational/39b-multiple-mcp.py | 203 ++++++++++++++++++++++ src/pipecat/services/mcp_service.py | 27 +-- 3 files changed, 218 insertions(+), 14 deletions(-) create mode 100644 examples/foundational/39b-multiple-mcp.py diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py index f73bf0918..a624bc7b8 100644 --- a/examples/foundational/39a-mcp-run-sse.py +++ b/examples/foundational/39a-mcp-run-sse.py @@ -55,7 +55,6 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): try: # https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/ - # ie. "https://www.mcp.run/api/mcp/sse?..." mcp = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL")) except Exception as e: logger.error(f"error setting up mcp") @@ -69,6 +68,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): You have access to a number of tools provided by mcp.run. Use any and all tools to help users. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way. + When asked for today's date, use 'https://www.datetoday.net/'. Don't overexplain what you are doing. Just respond with short sentences when you are carrying out tool calls. """ diff --git a/examples/foundational/39b-multiple-mcp.py b/examples/foundational/39b-multiple-mcp.py new file mode 100644 index 000000000..473054009 --- /dev/null +++ b/examples/foundational/39b-multiple-mcp.py @@ -0,0 +1,203 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import io +import os +import re +import shutil +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from mcp import StdioServerParameters +from PIL import Image + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +class UrlToImageProcessor(FrameProcessor): + def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): + super().__init__(**kwargs) + self._aiohttp_session = aiohttp_session + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, FunctionCallResultFrame): + await self.push_frame(frame, direction) + image_url = self.extract_url(frame.result) + if image_url: + await self.run_image_process(image_url) + # sometimes we get multiple image urls- process 1 at a time + await asyncio.sleep(1) + else: + await self.push_frame(frame, direction) + + def extract_url(self, text: str): + pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG|gif))\)" + match = re.search(pattern, text) + if match: + return match.group(1) + return None + + async def run_image_process(self, image_url: str): + try: + logger.debug(f"handling image from url: '{image_url}'") + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + image = image.convert("RGB") + frame = URLImageRawFrame( + url=image_url, image=image.tobytes(), size=image.size, format="RGB" + ) + await self.push_frame(frame) + except Exception as e: + error_msg = f"Error handling image url {image_url}: {str(e)}" + logger.error(error_msg) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by NASA MCP. Use any and all tools to help users. + When asked for today's date, use 'https://www.datetoday.net/'. + When asked for the astronomy picture of the day, use 'https://www.datetoday.net/', to get today's date. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + try: + mcp = MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@programcomputer/nasa-mcp-server@latest"], + # https://api.nasa.gov + env={"NASA_API_KEY": os.getenv("NASA_API_KEY")}, + ) + ) + except Exception as e: + logger.error(f"error setting up nasa mcp") + logger.exception("error trace:") + try: + # https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/ + # ie. "https://www.mcp.run/api/mcp/sse?..." + # ensure the profile has a tool or few installed + mcp_run = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL")) + except Exception as e: + logger.error(f"error setting up mcp.run") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + run_tools = await mcp_run.register_tools(llm) + + all_standard_tools = run_tools.standard_tools + tools.standard_tools + all_tools = ToolsSchema(standard_tools=all_standard_tools) + + context = OpenAILLMContext(messages, all_tools) + context_aggregator = llm.create_context_aggregator(context) + mcp_image_processor = UrlToImageProcessor(aiohttp_session=session) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + mcp_image_processor, # URL image -> output + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index 180b5b2c4..637170d39 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -155,20 +155,21 @@ class MCPClient(BaseObject): error_msg = f"Error calling mcp tool {function_name}: {str(e)}" logger.error(error_msg) - response = "" + response = "Sorry, could not call the mcp tool" image_url = None - if hasattr(results, "content") and results.content: - for i, content in enumerate(results.content): - if hasattr(content, "text") and content.text: - logger.debug(f"Tool response chunk {i}: {content.text}") - response += content.text - else: - # logger.debug(f"Non-text result content: '{content}'") - pass - logger.info(f"Tool '{function_name}' completed successfully") - logger.debug(f"Final response: {response}") - else: - logger.error(f"Error getting content from {function_name} results.") + if results: + if hasattr(results, "content") and results.content: + for i, content in enumerate(results.content): + if hasattr(content, "text") and content.text: + logger.debug(f"Tool response chunk {i}: {content.text}") + response += content.text + else: + # logger.debug(f"Non-text result content: '{content}'") + pass + logger.info(f"Tool '{function_name}' completed successfully") + logger.debug(f"Final response: {response}") + else: + logger.error(f"Error getting content from {function_name} results.") await result_callback(response) From b29ffeef294595419ae2bcc4fbef075f57cc24e2 Mon Sep 17 00:00:00 2001 From: vipyne Date: Thu, 24 Apr 2025 17:19:50 -0500 Subject: [PATCH 4/6] update 39* examples as per #1648 --- examples/foundational/39-mcp-stdio.py | 8 +++----- examples/foundational/39a-mcp-run-sse.py | 2 -- examples/foundational/39b-multiple-mcp.py | 8 +++----- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py index e32f0c7ff..dfde6abba 100644 --- a/examples/foundational/39-mcp-stdio.py +++ b/examples/foundational/39-mcp-stdio.py @@ -88,12 +88,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, - camera_out_enabled=True, - camera_out_width=1024, - camera_out_height=1024, - vad_enabled=True, + video_out_enabled=True, + video_out_width=1024, + video_out_height=1024, vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, ), ) diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py index a624bc7b8..be7415931 100644 --- a/examples/foundational/39a-mcp-run-sse.py +++ b/examples/foundational/39a-mcp-run-sse.py @@ -36,9 +36,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, - vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, ), ) diff --git a/examples/foundational/39b-multiple-mcp.py b/examples/foundational/39b-multiple-mcp.py index 473054009..6e55e75a6 100644 --- a/examples/foundational/39b-multiple-mcp.py +++ b/examples/foundational/39b-multiple-mcp.py @@ -89,12 +89,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, - camera_out_enabled=True, - camera_out_width=1024, - camera_out_height=1024, - vad_enabled=True, + video_out_enabled=True, + video_out_width=1024, + video_out_height=1024, vad_analyzer=SileroVADAnalyzer(), - vad_audio_passthrough=True, ), ) From cb7cb381aa9c656065c9d9a440d4042670a0c925 Mon Sep 17 00:00:00 2001 From: vipyne Date: Thu, 24 Apr 2025 18:04:22 -0500 Subject: [PATCH 5/6] Add MCPClient changelog line --- CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbea043a5..3a44d17a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # Changelog All notable changes to **Pipecat** will be documented in this file. @@ -7,12 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `MCPClient`; a way to connect to MCP servers and use the MCP servers' tools. + ### Fixed - Fixed an issue where the `SmartTurnMetricsData` was reporting 0ms for inference and processing time when using the `FalSmartTurnAnalyzer`. -## [0.0.65] - 2025-04-23 "Sant Jordi's release" +## [0.0.65] - 2025-04-23 "Sant Jordi's release" 🌹📕 https://en.wikipedia.org/wiki/Saint_George%27s_Day_in_Catalonia From 4cc8a4312c63780e89abb58630ba75da36b1b9a4 Mon Sep 17 00:00:00 2001 From: vipyne Date: Thu, 24 Apr 2025 18:11:35 -0500 Subject: [PATCH 6/6] Revert "update 39* examples as per #1648" This reverts commit b29ffeef294595419ae2bcc4fbef075f57cc24e2. --- examples/foundational/39-mcp-stdio.py | 8 +++++--- examples/foundational/39a-mcp-run-sse.py | 2 ++ examples/foundational/39b-multiple-mcp.py | 8 +++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py index dfde6abba..e32f0c7ff 100644 --- a/examples/foundational/39-mcp-stdio.py +++ b/examples/foundational/39-mcp-stdio.py @@ -88,10 +88,12 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py index be7415931..a624bc7b8 100644 --- a/examples/foundational/39a-mcp-run-sse.py +++ b/examples/foundational/39a-mcp-run-sse.py @@ -36,7 +36,9 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, + vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), ) diff --git a/examples/foundational/39b-multiple-mcp.py b/examples/foundational/39b-multiple-mcp.py index 6e55e75a6..473054009 100644 --- a/examples/foundational/39b-multiple-mcp.py +++ b/examples/foundational/39b-multiple-mcp.py @@ -89,10 +89,12 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, - video_out_enabled=True, - video_out_width=1024, - video_out_height=1024, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, ), )