Compare commits

...

2 Commits

Author SHA1 Message Date
vipyne
115c45b5fc mcp_run POC 2025-04-10 15:21:29 -05:00
vipyne
1908e7cd97 WIP getting mcp.run to work 2025-04-10 13:51:00 -05:00
2 changed files with 251 additions and 0 deletions

View File

@@ -0,0 +1,110 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.mcp_run.mcp_run import MCPRun
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
load_dotenv(override=True)
logger.remove()
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Bot with MCP tools",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest")
# llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
# llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
mcp_run = MCPRun(llm)
tools = mcp_run.register_mcp_tools(llm)
system = """
You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities
in a succinct way. You have access to various tools provided by mcp.run that you can use to help users.
Your output will be converted to audio so don't include special characters in your answers.
Respond to what the user said in a creative and helpful way. Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
messages = [{"role": "system","content": system}]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.info("First participant joined: {}", participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,141 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import json
from typing import Any, Dict, List, Mapping, Optional, Union
from loguru import logger
from pipecat.services.llm_service import LLMService
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from mcp_run import Client
try:
from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use mcp.run, you need to `pip install pipecat-ai[mcp_run]`. "
+ "Also, set `MCP_RUN_SESSION_ID` environment variable."
)
raise Exception(f"Missing module: {e}")
class MCPRun(Client):
def __init__(
self,
llm: LLMService,
mcp_run_session_id: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self._client = Client()
self._mcp_run_session_id = mcp_run_session_id or os.getenv("MCP_RUN_SESSION_ID")
def convert_mcp_schema_to_pipecat(self, tool_name: str, tool_schema: dict[str, any]) -> FunctionSchema:
"""Convert an mcp.run tool schema to Pipecat's FunctionSchema format.
Args:
tool_name: The name of the tool
tool_schema: The mcp.run tool schema
Returns:
A FunctionSchema instance
"""
logger.debug(f"Converting schema for tool '{tool_name}'")
logger.debug(f"Original schema: {json.dumps(tool_schema, indent=2)}")
# Extract properties and required fields from the mcp.run schema
properties = tool_schema["input_schema"].get("properties", {})
required = tool_schema["input_schema"].get("required", [])
schema = FunctionSchema(
name=tool_name,
description=tool_schema["description"],
properties=properties,
required=required
)
logger.debug(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}")
return schema
def register_mcp_tools(self, llm) -> ToolsSchema:
"""Register all available mcp.run tools with the LLM service.
Args:
llm: The Pipecat LLM service to register tools with
Returns:
A ToolsSchema containing all registered tools
"""
async def mcp_tool_wrapper(function_name: str, tool_call_id: str, arguments: dict[str, any],
llm: any, context: any, result_callback: any) -> None:
"""Wrapper for mcp.run tool calls to match Pipecat's function call interface.
"""
logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}")
logger.debug(f"Tool arguments: {json.dumps(arguments, indent=2)}")
try:
# Call the mcp.run tool
logger.debug(f"Calling mcp.run tool '{function_name}'")
results = self._client.call_tool(function_name, params=arguments)
# Combine all content into a single response
response = ""
for i, content in enumerate(results.content):
logger.debug(f"Tool response chunk {i}: {content.text}")
response += content.text
logger.info(f"Tool '{function_name}' completed successfully")
logger.info(f"Final response: {response}")
# Send result back through callback
await result_callback(response)
except Exception as e:
error_msg = f"Error calling mcp.run tool {function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await result_callback(error_msg)
logger.debug("Starting registration of mcp.run tools")
tool_schemas: List[FunctionSchema] = []
# Get all available tools from mcp.run
available_tools = self._client.tools
logger.debug(f"Found {len(available_tools)} available tools")
for tool_name, tool in available_tools.items():
logger.debug(f"Processing tool: {tool_name}")
logger.debug(f"Tool description: {tool.description}")
try:
# Convert the schema
function_schema = self.convert_mcp_schema_to_pipecat(tool_name, {
"description": tool.description,
"input_schema": tool.input_schema
})
# Register the wrapped function
logger.debug(f"Registering function handler for '{tool_name}'")
llm.register_function(tool_name, mcp_tool_wrapper)
# Add to our list of schemas
tool_schemas.append(function_schema)
logger.debug(f"Successfully registered tool '{tool_name}'")
except Exception as e:
logger.error(f"Failed to register tool '{tool_name}': {str(e)}")
logger.exception("Full exception details:")
continue
logger.info(f"Completed registration of {len(tool_schemas)} tools")
tools_schema = ToolsSchema(standard_tools=tool_schemas)
return tools_schema