diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py index 7543a4922..e32f0c7ff 100644 --- a/examples/foundational/39-mcp-stdio.py +++ b/examples/foundational/39-mcp-stdio.py @@ -4,12 +4,12 @@ # SPDX-License-Identifier: BSD 2-Clause License # +import asyncio import io import os import re import shutil import sys -import time import aiohttp from dotenv import load_dotenv @@ -38,14 +38,11 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) -logger.remove() -logger.add(sys.stderr, level="DEBUG") - class UrlToImageProcessor(FrameProcessor): - def __init__(self, **kwargs): + def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): super().__init__(**kwargs) - self._pipecat_session = kwargs["pipecat_session"] + self._aiohttp_session = aiohttp_session async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -56,9 +53,7 @@ class UrlToImageProcessor(FrameProcessor): if image_url: await self.run_image_process(image_url) # sometimes we get multiple image urls- process 1 at a time - time.sleep(1) - else: - pass + await asyncio.sleep(1) else: await self.push_frame(frame, direction) @@ -72,7 +67,7 @@ class UrlToImageProcessor(FrameProcessor): async def run_image_process(self, image_url: str): try: logger.debug(f"handling image from url: '{image_url}'") - async with self._pipecat_session.get(image_url) as response: + async with self._aiohttp_session.get(image_url) as response: image_stream = io.BytesIO(await response.content.read()) image = Image.open(image_stream) image = image.convert("RGB") @@ -128,7 +123,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection): logger.error(f"error setting up mcp") logger.exception("error trace:") - mcp_image = UrlToImageProcessor(pipecat_session=session) + mcp_image = UrlToImageProcessor(aiohttp_session=session) tools = await mcp.register_tools(llm) diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py index e313be247..f73bf0918 100644 --- a/examples/foundational/39a-mcp-run-sse.py +++ b/examples/foundational/39a-mcp-run-sse.py @@ -27,9 +27,6 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection load_dotenv(override=True) -logger.remove() -logger.add(sys.stderr, level="DEBUG") - async def run_bot(webrtc_connection: SmallWebRTCConnection): logger.info(f"Starting bot") diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py index c0775e564..180b5b2c4 100644 --- a/src/pipecat/services/mcp_service.py +++ b/src/pipecat/services/mcp_service.py @@ -5,6 +5,7 @@ from loguru import logger from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.utils.base_object import BaseObject try: from mcp import ClientSession, StdioServerParameters, types @@ -17,17 +18,15 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class MCPClient: +class MCPClient(BaseObject): def __init__( self, - server_params: Union[StdioServerParameters, str] = None, - pipecat_session: Optional[any] = None, + server_params: Union[StdioServerParameters, str], **kwargs, ): super().__init__(**kwargs) self._server_params = server_params self._session = ClientSession - self._pipecat_session = pipecat_session if isinstance(server_params, StdioServerParameters): self._client = stdio_client self._register_tools = self._stdio_register_tools @@ -35,15 +34,15 @@ class MCPClient: self._client = sse_client self._register_tools = self._sse_register_tools else: - logger.exception( - f"{self} error - server_params must be either StdioServerParameters or an sse server url string." + raise TypeError( + f"{self} invalid argument type: `server_params` must be either StdioServerParameters or an SSE server url string." ) async def register_tools(self, llm) -> ToolsSchema: tools_schema = await self._register_tools(llm) return tools_schema - def convert_mcp_schema_to_pipecat( + def _convert_mcp_schema_to_pipecat( self, tool_name: str, tool_schema: Dict[str, Any] ) -> FunctionSchema: """Convert an mcp tool schema to Pipecat's FunctionSchema format. @@ -55,7 +54,7 @@ class MCPClient: """ logger.debug(f"Converting schema for tool '{tool_name}'") - logger.debug(f"Original schema: {json.dumps(tool_schema, indent=2)}") + logger.trace(f"Original schema: {json.dumps(tool_schema, indent=2)}") properties = tool_schema["input_schema"].get("properties", {}) required = tool_schema["input_schema"].get("required", []) @@ -67,7 +66,7 @@ class MCPClient: required=required, ) - logger.debug(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") + logger.trace(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") return schema @@ -89,7 +88,7 @@ class MCPClient: ) -> None: """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") - logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client(self._server_params) as (read, write): async with self._session(read, write) as session: @@ -128,7 +127,7 @@ class MCPClient: ) -> None: """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") - logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") try: async with self._client(self._server_params) as streams: async with self._session(streams[0], streams[1]) as session: @@ -167,7 +166,7 @@ class MCPClient: # logger.debug(f"Non-text result content: '{content}'") pass logger.info(f"Tool '{function_name}' completed successfully") - logger.info(f"Final response: {response}") + logger.debug(f"Final response: {response}") else: logger.error(f"Error getting content from {function_name} results.") @@ -189,7 +188,7 @@ class MCPClient: try: # Convert the schema - function_schema = self.convert_mcp_schema_to_pipecat( + function_schema = self._convert_mcp_schema_to_pipecat( tool_name, {"description": tool.description, "input_schema": tool.inputSchema}, ) @@ -207,7 +206,7 @@ class MCPClient: logger.exception("Full exception details:") continue - logger.info(f"Completed registration of {len(tool_schemas)} tools") + logger.debug(f"Completed registration of {len(tool_schemas)} tools") tools_schema = ToolsSchema(standard_tools=tool_schemas) return tools_schema