@@ -1,3 +1,4 @@
|
||||
|
||||
# Changelog
|
||||
|
||||
All notable changes to **Pipecat** will be documented in this file.
|
||||
@@ -7,12 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added `MCPClient`; a way to connect to MCP servers and use the MCP servers' tools.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where the `SmartTurnMetricsData` was reporting 0ms for
|
||||
inference and processing time when using the `FalSmartTurnAnalyzer`.
|
||||
|
||||
## [0.0.65] - 2025-04-23 "Sant Jordi's release"
|
||||
## [0.0.65] - 2025-04-23 "Sant Jordi's release" 🌹📕
|
||||
|
||||
https://en.wikipedia.org/wiki/Saint_George%27s_Day_in_Catalonia
|
||||
|
||||
|
||||
192
examples/foundational/39-mcp-stdio.py
Normal file
192
examples/foundational/39-mcp-stdio.py
Normal file
@@ -0,0 +1,192 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from mcp import StdioServerParameters
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.mcp_service import MCPClient
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class UrlToImageProcessor(FrameProcessor):
|
||||
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, FunctionCallResultFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
image_url = self.extract_url(frame.result)
|
||||
if image_url:
|
||||
await self.run_image_process(image_url)
|
||||
# sometimes we get multiple image urls- process 1 at a time
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def extract_url(self, text: str):
|
||||
pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG))\)"
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
async def run_image_process(self, image_url: str):
|
||||
try:
|
||||
logger.debug(f"handling image from url: '{image_url}'")
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
image = image.convert("RGB")
|
||||
frame = URLImageRawFrame(
|
||||
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
error_msg = f"Error handling image url {image_url}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
camera_out_height=1024,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Create an HTTP session for API calls
|
||||
async with aiohttp.ClientSession() as session:
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest"
|
||||
)
|
||||
|
||||
try:
|
||||
mcp = MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
args=["-y", "@programcomputer/nasa-mcp-server@latest"],
|
||||
# https://api.nasa.gov
|
||||
env={"NASA_API_KEY": os.getenv("NASA_API_KEY")},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp")
|
||||
logger.exception("error trace:")
|
||||
|
||||
mcp_image = UrlToImageProcessor(aiohttp_session=session)
|
||||
|
||||
tools = await mcp.register_tools(llm)
|
||||
|
||||
system = f"""
|
||||
You are a helpful LLM in a WebRTC call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to a number of tools provided by NASA MCP. Use any and all tools to help users.
|
||||
When asked for the astronomy picture of the day, PASS in NO date to the API.
|
||||
This ensures we get the latest picture available. If as specific date is asked for, you
|
||||
can pass in that date to the API.
|
||||
Your output will be converted to audio so don't include special characters in your answers.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
messages = [{"role": "system", "content": system}]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
mcp_image, # URL image -> output
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
124
examples/foundational/39a-mcp-run-sse.py
Normal file
124
examples/foundational/39a-mcp-run-sse.py
Normal file
@@ -0,0 +1,124 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.mcp_service import MCPClient
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest"
|
||||
)
|
||||
|
||||
try:
|
||||
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
|
||||
mcp = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL"))
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp")
|
||||
logger.exception("error trace:")
|
||||
|
||||
tools = await mcp.register_tools(llm)
|
||||
|
||||
system = f"""
|
||||
You are a helpful LLM in a WebRTC call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to a number of tools provided by mcp.run. Use any and all tools to help users.
|
||||
Your output will be converted to audio so don't include special characters in your answers.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
When asked for today's date, use 'https://www.datetoday.net/'.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
messages = [{"role": "system", "content": system}]
|
||||
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
203
examples/foundational/39b-multiple-mcp.py
Normal file
203
examples/foundational/39b-multiple-mcp.py
Normal file
@@ -0,0 +1,203 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from mcp import StdioServerParameters
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.mcp_service import MCPClient
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class UrlToImageProcessor(FrameProcessor):
|
||||
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, FunctionCallResultFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
image_url = self.extract_url(frame.result)
|
||||
if image_url:
|
||||
await self.run_image_process(image_url)
|
||||
# sometimes we get multiple image urls- process 1 at a time
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def extract_url(self, text: str):
|
||||
pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG|gif))\)"
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
async def run_image_process(self, image_url: str):
|
||||
try:
|
||||
logger.debug(f"handling image from url: '{image_url}'")
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
image = image.convert("RGB")
|
||||
frame = URLImageRawFrame(
|
||||
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
error_msg = f"Error handling image url {image_url}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
camera_out_height=1024,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Create an HTTP session for API calls
|
||||
async with aiohttp.ClientSession() as session:
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest"
|
||||
)
|
||||
|
||||
system = f"""
|
||||
You are a helpful LLM in a WebRTC call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to a number of tools provided by NASA MCP. Use any and all tools to help users.
|
||||
When asked for today's date, use 'https://www.datetoday.net/'.
|
||||
When asked for the astronomy picture of the day, use 'https://www.datetoday.net/', to get today's date.
|
||||
Your output will be converted to audio so don't include special characters in your answers.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
messages = [{"role": "system", "content": system}]
|
||||
|
||||
try:
|
||||
mcp = MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
args=["-y", "@programcomputer/nasa-mcp-server@latest"],
|
||||
# https://api.nasa.gov
|
||||
env={"NASA_API_KEY": os.getenv("NASA_API_KEY")},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up nasa mcp")
|
||||
logger.exception("error trace:")
|
||||
try:
|
||||
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
|
||||
# ie. "https://www.mcp.run/api/mcp/sse?..."
|
||||
# ensure the profile has a tool or few installed
|
||||
mcp_run = MCPClient(server_params=os.getenv("MCP_RUN_SSE_URL"))
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp.run")
|
||||
logger.exception("error trace:")
|
||||
|
||||
tools = await mcp.register_tools(llm)
|
||||
run_tools = await mcp_run.register_tools(llm)
|
||||
|
||||
all_standard_tools = run_tools.standard_tools + tools.standard_tools
|
||||
all_tools = ToolsSchema(standard_tools=all_standard_tools)
|
||||
|
||||
context = OpenAILLMContext(messages, all_tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
mcp_image_processor = UrlToImageProcessor(aiohttp_session=session)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
mcp_image_processor, # URL image -> output
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
@@ -64,6 +64,7 @@ langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-ope
|
||||
livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity~=9.0.0" ]
|
||||
lmnt = [ "websockets~=13.1" ]
|
||||
local = [ "pyaudio~=0.2.14" ]
|
||||
mcp = [ "mcp[cli]~=1.6.0" ]
|
||||
mem0 = [ "mem0ai~=0.1.76" ]
|
||||
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
|
||||
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
|
||||
|
||||
213
src/pipecat/services/mcp_service.py
Normal file
213
src/pipecat/services/mcp_service.py
Normal file
@@ -0,0 +1,213 @@
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
try:
|
||||
from mcp import ClientSession, StdioServerParameters, types
|
||||
from mcp.client.session import ClientSession
|
||||
from mcp.client.sse import sse_client
|
||||
from mcp.client.stdio import stdio_client
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class MCPClient(BaseObject):
|
||||
def __init__(
|
||||
self,
|
||||
server_params: Union[StdioServerParameters, str],
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._server_params = server_params
|
||||
self._session = ClientSession
|
||||
if isinstance(server_params, StdioServerParameters):
|
||||
self._client = stdio_client
|
||||
self._register_tools = self._stdio_register_tools
|
||||
elif isinstance(server_params, str):
|
||||
self._client = sse_client
|
||||
self._register_tools = self._sse_register_tools
|
||||
else:
|
||||
raise TypeError(
|
||||
f"{self} invalid argument type: `server_params` must be either StdioServerParameters or an SSE server url string."
|
||||
)
|
||||
|
||||
async def register_tools(self, llm) -> ToolsSchema:
|
||||
tools_schema = await self._register_tools(llm)
|
||||
return tools_schema
|
||||
|
||||
def _convert_mcp_schema_to_pipecat(
|
||||
self, tool_name: str, tool_schema: Dict[str, Any]
|
||||
) -> FunctionSchema:
|
||||
"""Convert an mcp tool schema to Pipecat's FunctionSchema format.
|
||||
Args:
|
||||
tool_name: The name of the tool
|
||||
tool_schema: The mcp tool schema
|
||||
Returns:
|
||||
A FunctionSchema instance
|
||||
"""
|
||||
|
||||
logger.debug(f"Converting schema for tool '{tool_name}'")
|
||||
logger.trace(f"Original schema: {json.dumps(tool_schema, indent=2)}")
|
||||
|
||||
properties = tool_schema["input_schema"].get("properties", {})
|
||||
required = tool_schema["input_schema"].get("required", [])
|
||||
|
||||
schema = FunctionSchema(
|
||||
name=tool_name,
|
||||
description=tool_schema["description"],
|
||||
properties=properties,
|
||||
required=required,
|
||||
)
|
||||
|
||||
logger.trace(f"Converted schema: {json.dumps(schema.to_default_dict(), indent=2)}")
|
||||
|
||||
return schema
|
||||
|
||||
async def _sse_register_tools(self, llm) -> ToolsSchema:
|
||||
"""Register all available mcp.run tools with the LLM service.
|
||||
Args:
|
||||
llm: The Pipecat LLM service to register tools with
|
||||
Returns:
|
||||
A ToolsSchema containing all registered tools
|
||||
"""
|
||||
|
||||
async def mcp_tool_wrapper(
|
||||
function_name: str,
|
||||
tool_call_id: str,
|
||||
arguments: Dict[str, Any],
|
||||
llm: any,
|
||||
context: any,
|
||||
result_callback: any,
|
||||
) -> None:
|
||||
"""Wrapper for mcp.run tool calls to match Pipecat's function call interface."""
|
||||
logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}")
|
||||
logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}")
|
||||
try:
|
||||
async with self._client(self._server_params) as (read, write):
|
||||
async with self._session(read, write) as session:
|
||||
await session.initialize()
|
||||
await self._call_tool(session, function_name, arguments, result_callback)
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception("Full exception details:")
|
||||
await result_callback(error_msg)
|
||||
|
||||
logger.debug("Starting registration of mcp.run tools")
|
||||
tool_schemas: List[FunctionSchema] = []
|
||||
|
||||
async with self._client(self._server_params) as (read, write):
|
||||
async with self._session(read, write) as session:
|
||||
await session.initialize()
|
||||
tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm)
|
||||
return tools_schema
|
||||
|
||||
async def _stdio_register_tools(self, llm) -> ToolsSchema:
|
||||
"""Register all available mcp.run tools with the LLM service.
|
||||
Args:
|
||||
llm: The Pipecat LLM service to register tools with
|
||||
Returns:
|
||||
A ToolsSchema containing all registered tools
|
||||
"""
|
||||
|
||||
async def mcp_tool_wrapper(
|
||||
function_name: str,
|
||||
tool_call_id: str,
|
||||
arguments: Dict[str, Any],
|
||||
llm: any,
|
||||
context: any,
|
||||
result_callback: any,
|
||||
) -> None:
|
||||
"""Wrapper for mcp.run tool calls to match Pipecat's function call interface."""
|
||||
logger.debug(f"Executing tool '{function_name}' with call ID: {tool_call_id}")
|
||||
logger.trace(f"Tool arguments: {json.dumps(arguments, indent=2)}")
|
||||
try:
|
||||
async with self._client(self._server_params) as streams:
|
||||
async with self._session(streams[0], streams[1]) as session:
|
||||
await session.initialize()
|
||||
await self._call_tool(session, function_name, arguments, result_callback)
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception("Full exception details:")
|
||||
await result_callback(error_msg)
|
||||
|
||||
logger.debug("Starting registration of mcp.run tools")
|
||||
|
||||
async with self._client(self._server_params) as streams:
|
||||
async with self._session(streams[0], streams[1]) as session:
|
||||
await session.initialize()
|
||||
tools_schema = await self._list_tools(session, mcp_tool_wrapper, llm)
|
||||
return tools_schema
|
||||
|
||||
async def _call_tool(self, session, function_name, arguments, result_callback):
|
||||
logger.debug(f"Calling mcp tool '{function_name}'")
|
||||
try:
|
||||
results = await session.call_tool(function_name, arguments=arguments)
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling mcp tool {function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
response = "Sorry, could not call the mcp tool"
|
||||
image_url = None
|
||||
if results:
|
||||
if hasattr(results, "content") and results.content:
|
||||
for i, content in enumerate(results.content):
|
||||
if hasattr(content, "text") and content.text:
|
||||
logger.debug(f"Tool response chunk {i}: {content.text}")
|
||||
response += content.text
|
||||
else:
|
||||
# logger.debug(f"Non-text result content: '{content}'")
|
||||
pass
|
||||
logger.info(f"Tool '{function_name}' completed successfully")
|
||||
logger.debug(f"Final response: {response}")
|
||||
else:
|
||||
logger.error(f"Error getting content from {function_name} results.")
|
||||
|
||||
await result_callback(response)
|
||||
|
||||
async def _list_tools(self, session, mcp_tool_wrapper, llm):
|
||||
available_tools = await session.list_tools()
|
||||
tool_schemas: List[FunctionSchema] = []
|
||||
|
||||
try:
|
||||
logger.debug(f"Found {len(available_tools)} available tools")
|
||||
except:
|
||||
pass
|
||||
|
||||
for tool in available_tools.tools:
|
||||
tool_name = tool.name
|
||||
logger.debug(f"Processing tool: {tool_name}")
|
||||
logger.debug(f"Tool description: {tool.description}")
|
||||
|
||||
try:
|
||||
# Convert the schema
|
||||
function_schema = self._convert_mcp_schema_to_pipecat(
|
||||
tool_name,
|
||||
{"description": tool.description, "input_schema": tool.inputSchema},
|
||||
)
|
||||
|
||||
# Register the wrapped function
|
||||
logger.debug(f"Registering function handler for '{tool_name}'")
|
||||
llm.register_function(tool_name, mcp_tool_wrapper)
|
||||
|
||||
# Add to list of schemas
|
||||
tool_schemas.append(function_schema)
|
||||
logger.debug(f"Successfully registered tool '{tool_name}'")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to register tool '{tool_name}': {str(e)}")
|
||||
logger.exception("Full exception details:")
|
||||
continue
|
||||
|
||||
logger.debug(f"Completed registration of {len(tool_schemas)} tools")
|
||||
tools_schema = ToolsSchema(standard_tools=tool_schemas)
|
||||
|
||||
return tools_schema
|
||||
Reference in New Issue
Block a user