diff --git a/CHANGELOG.md b/CHANGELOG.md index bbea043a5..3a44d17a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # Changelog All notable changes to **Pipecat** will be documented in this file. @@ -7,12 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `MCPClient`; a way to connect to MCP servers and use the MCP servers' tools. + ### Fixed - Fixed an issue where the `SmartTurnMetricsData` was reporting 0ms for inference and processing time when using the `FalSmartTurnAnalyzer`. -## [0.0.65] - 2025-04-23 "Sant Jordi's release" +## [0.0.65] - 2025-04-23 "Sant Jordi's release" 🌹📕 https://en.wikipedia.org/wiki/Saint_George%27s_Day_in_Catalonia diff --git a/examples/foundational/39-mcp-stdio.py b/examples/foundational/39-mcp-stdio.py new file mode 100644 index 000000000..e32f0c7ff --- /dev/null +++ b/examples/foundational/39-mcp-stdio.py @@ -0,0 +1,192 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import io +import os +import re +import shutil +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from mcp import StdioServerParameters +from PIL import Image + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +class UrlToImageProcessor(FrameProcessor): + def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): + super().__init__(**kwargs) + self._aiohttp_session = aiohttp_session + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, FunctionCallResultFrame): + await self.push_frame(frame, direction) + image_url = self.extract_url(frame.result) + if image_url: + await self.run_image_process(image_url) + # sometimes we get multiple image urls- process 1 at a time + await asyncio.sleep(1) + else: + await self.push_frame(frame, direction) + + def extract_url(self, text: str): + pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG))\)" + match = re.search(pattern, text) + if match: + return match.group(1) + return None + + async def run_image_process(self, image_url: str): + try: + logger.debug(f"handling image from url: '{image_url}'") + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + image = image.convert("RGB") + frame = URLImageRawFrame( + url=image_url, image=image.tobytes(), size=image.size, format="RGB" + ) + await self.push_frame(frame) + except Exception as e: + error_msg = f"Error handling image url {image_url}: {str(e)}" + logger.error(error_msg) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + try: + mcp = MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@programcomputer/nasa-mcp-server@latest"], + # https://api.nasa.gov + env={"NASA_API_KEY": os.getenv("NASA_API_KEY")}, + ) + ) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + mcp_image = UrlToImageProcessor(aiohttp_session=session) + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by NASA MCP. Use any and all tools to help users. + When asked for the astronomy picture of the day, PASS in NO date to the API. + This ensures we get the latest picture available. If as specific date is asked for, you + can pass in that date to the API. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + mcp_image, # URL image -> output + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/examples/foundational/39a-mcp-run-sse.py b/examples/foundational/39a-mcp-run-sse.py new file mode 100644 index 000000000..a624bc7b8 --- /dev/null +++ b/examples/foundational/39a-mcp-run-sse.py @@ -0,0 +1,124 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + try: + # https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/ + mcp = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL")) + except Exception as e: + logger.error(f"error setting up mcp") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by mcp.run. Use any and all tools to help users. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + When asked for today's date, use 'https://www.datetoday.net/'. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + context = OpenAILLMContext(messages, tools) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/examples/foundational/39b-multiple-mcp.py b/examples/foundational/39b-multiple-mcp.py new file mode 100644 index 000000000..473054009 --- /dev/null +++ b/examples/foundational/39b-multiple-mcp.py @@ -0,0 +1,203 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import io +import os +import re +import shutil +import sys + +import aiohttp +from dotenv import load_dotenv +from loguru import logger +from mcp import StdioServerParameters +from PIL import Image + +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import ( + Frame, + FunctionCallResultFrame, + URLImageRawFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.anthropic.llm import AnthropicLLMService +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.mcp_service import MCPClient +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +class UrlToImageProcessor(FrameProcessor): + def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs): + super().__init__(**kwargs) + self._aiohttp_session = aiohttp_session + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, FunctionCallResultFrame): + await self.push_frame(frame, direction) + image_url = self.extract_url(frame.result) + if image_url: + await self.run_image_process(image_url) + # sometimes we get multiple image urls- process 1 at a time + await asyncio.sleep(1) + else: + await self.push_frame(frame, direction) + + def extract_url(self, text: str): + pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG|gif))\)" + match = re.search(pattern, text) + if match: + return match.group(1) + return None + + async def run_image_process(self, image_url: str): + try: + logger.debug(f"handling image from url: '{image_url}'") + async with self._aiohttp_session.get(image_url) as response: + image_stream = io.BytesIO(await response.content.read()) + image = Image.open(image_stream) + image = image.convert("RGB") + frame = URLImageRawFrame( + url=image_url, image=image.tobytes(), size=image.size, format="RGB" + ) + await self.push_frame(frame) + except Exception as e: + error_msg = f"Error handling image url {image_url}: {str(e)}" + logger.error(error_msg) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection): + logger.info(f"Starting bot") + + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + camera_out_enabled=True, + camera_out_width=1024, + camera_out_height=1024, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + ), + ) + + # Create an HTTP session for API calls + async with aiohttp.ClientSession() as session: + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = AnthropicLLMService( + api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest" + ) + + system = f""" + You are a helpful LLM in a WebRTC call. + Your goal is to demonstrate your capabilities in a succinct way. + You have access to a number of tools provided by NASA MCP. Use any and all tools to help users. + When asked for today's date, use 'https://www.datetoday.net/'. + When asked for the astronomy picture of the day, use 'https://www.datetoday.net/', to get today's date. + Your output will be converted to audio so don't include special characters in your answers. + Respond to what the user said in a creative and helpful way. + Don't overexplain what you are doing. + Just respond with short sentences when you are carrying out tool calls. + """ + + messages = [{"role": "system", "content": system}] + + try: + mcp = MCPClient( + server_params=StdioServerParameters( + command=shutil.which("npx"), + args=["-y", "@programcomputer/nasa-mcp-server@latest"], + # https://api.nasa.gov + env={"NASA_API_KEY": os.getenv("NASA_API_KEY")}, + ) + ) + except Exception as e: + logger.error(f"error setting up nasa mcp") + logger.exception("error trace:") + try: + # https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/ + # ie. "https://www.mcp.run/api/mcp/sse?..." + # ensure the profile has a tool or few installed + mcp_run = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL")) + except Exception as e: + logger.error(f"error setting up mcp.run") + logger.exception("error trace:") + + tools = await mcp.register_tools(llm) + run_tools = await mcp_run.register_tools(llm) + + all_standard_tools = run_tools.standard_tools + tools.standard_tools + all_tools = ToolsSchema(standard_tools=all_standard_tools) + + context = OpenAILLMContext(messages, all_tools) + context_aggregator = llm.create_context_aggregator(context) + mcp_image_processor = UrlToImageProcessor(aiohttp_session=session) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User spoken responses + llm, # LLM + tts, # TTS + mcp_image_processor, # URL image -> output + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses and tool context + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + ), + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected: {client}") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() diff --git a/pyproject.toml b/pyproject.toml index e57a2c5e5..58d2097ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ] lmnt = [ "websockets~=13.1" ] local = [ "pyaudio~=0.2.14" ] +mcp = [ "mcp[cli]~=1.6.0" ] mem0 = [ "mem0ai~=0.1.76" ] mlx-whisper = [ "mlx-whisper~=0.4.2" ] moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ] diff --git a/src/pipecat/services/mcp_service.py b/src/pipecat/services/mcp_service.py new file mode 100644 index 000000000..637170d39 --- /dev/null +++ b/src/pipecat/services/mcp_service.py @@ -0,0 +1,213 @@ +import json +from typing import Any, Dict, List, Optional, Union + +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.utils.base_object import BaseObject + +try: + from mcp import ClientSession, StdioServerParameters, types + from mcp.client.session import ClientSession + from mcp.client.sse import sse_client + from mcp.client.stdio import stdio_client +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.") + raise Exception(f"Missing module: {e}") + + +class MCPClient(BaseObject): + def __init__( + self, + server_params: Union[StdioServerParameters, str], + **kwargs, + ): + super().__init__(**kwargs) + self._server_params = server_params + self._session = ClientSession + if isinstance(server_params, StdioServerParameters): + self._client = stdio_client + self._register_tools = self._stdio_register_tools + elif isinstance(server_params, str): + self._client = sse_client + self._register_tools = self._sse_register_tools + else: + raise TypeError( + f"{self} invalid argument type: `server_params` must be either StdioServerParameters or an SSE server url string." + ) + + async def register_tools(self, llm) -> ToolsSchema: + tools_schema = await self._register_tools(llm) + return tools_schema + + def _convert_mcp_schema_to_pipecat( + self, tool_name: str, tool_schema: Dict[str, Any] + ) -> FunctionSchema: + """Convert an mcp tool schema to Pipecat's FunctionSchema format. + Args: + tool_name: The name of the tool + tool_schema: The mcp tool schema + Returns: + A FunctionSchema instance + """ + + logger.debug(f"Converting schema for tool '{tool_name}'") + logger.trace(f"Original schema: {json.dumps(tool_schema, indent=2)}") + + properties = tool_schema["input_schema"].get("properties", {}) + required = tool_schema["input_schema"].get("required", []) + + schema = FunctionSchema( + name=tool_name, + description=tool_schema["description"], + properties=properties, + required=required, + ) + + logger.trace(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}") + + return schema + + async def _sse_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp.run tools with the LLM service. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client(self._server_params) as (read, write): + async with self._session(read, write) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp.run tools") + tool_schemas: List[FunctionSchema] = [] + + async with self._client(self._server_params) as (read, write): + async with self._session(read, write) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + + async def _stdio_register_tools(self, llm) -> ToolsSchema: + """Register all available mcp.run tools with the LLM service. + Args: + llm: The Pipecat LLM service to register tools with + Returns: + A ToolsSchema containing all registered tools + """ + + async def mcp_tool_wrapper( + function_name: str, + tool_call_id: str, + arguments: Dict[str, Any], + llm: any, + context: any, + result_callback: any, + ) -> None: + """Wrapper for mcp.run tool calls to match Pipecat's function call interface.""" + logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}") + logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}") + try: + async with self._client(self._server_params) as streams: + async with self._session(streams[0], streams[1]) as session: + await session.initialize() + await self._call_tool(session, function_name, arguments, result_callback) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + logger.exception("Full exception details:") + await result_callback(error_msg) + + logger.debug("Starting registration of mcp.run tools") + + async with self._client(self._server_params) as streams: + async with self._session(streams[0], streams[1]) as session: + await session.initialize() + tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm) + return tools_schema + + async def _call_tool(self, session, function_name, arguments, result_callback): + logger.debug(f"Calling mcp tool '{function_name}'") + try: + results = await session.call_tool(function_name, arguments=arguments) + except Exception as e: + error_msg = f"Error calling mcp tool {function_name}: {str(e)}" + logger.error(error_msg) + + response = "Sorry, could not call the mcp tool" + image_url = None + if results: + if hasattr(results, "content") and results.content: + for i, content in enumerate(results.content): + if hasattr(content, "text") and content.text: + logger.debug(f"Tool response chunk {i}: {content.text}") + response += content.text + else: + # logger.debug(f"Non-text result content: '{content}'") + pass + logger.info(f"Tool '{function_name}' completed successfully") + logger.debug(f"Final response: {response}") + else: + logger.error(f"Error getting content from {function_name} results.") + + await result_callback(response) + + async def _list_tools(self, session, mcp_tool_wrapper, llm): + available_tools = await session.list_tools() + tool_schemas: List[FunctionSchema] = [] + + try: + logger.debug(f"Found {len(available_tools)} available tools") + except: + pass + + for tool in available_tools.tools: + tool_name = tool.name + logger.debug(f"Processing tool: {tool_name}") + logger.debug(f"Tool description: {tool.description}") + + try: + # Convert the schema + function_schema = self._convert_mcp_schema_to_pipecat( + tool_name, + {"description": tool.description, "input_schema": tool.inputSchema}, + ) + + # Register the wrapped function + logger.debug(f"Registering function handler for '{tool_name}'") + llm.register_function(tool_name, mcp_tool_wrapper) + + # Add to list of schemas + tool_schemas.append(function_schema) + logger.debug(f"Successfully registered tool '{tool_name}'") + + except Exception as e: + logger.error(f"Failed to register tool '{tool_name}': {str(e)}") + logger.exception("Full exception details:") + continue + + logger.debug(f"Completed registration of {len(tool_schemas)} tools") + tools_schema = ToolsSchema(standard_tools=tool_schemas) + + return tools_schema