前言
从[SpringAI MCP技术试用]到[【01】JSON-RPC2.0协议],[【02】SSE和StreamableHttp技术整理]这几篇总结大概说明了下MCP出现的原因,以及简单说明了下技术细节,但是MCP还是主要为LLM大模型服务的,那MCP和LLM大模型是怎么交互的呢?这里简单总结下交互流程。
整体流程
一些说明:
1)用户向AIAgent问“帮我计算3*111”,此时AIAgent就是MCP Client,MCP Client会先拉取所有注册到AIAgent的MCPServer元数据信息,然后把用户Query和所有的MCP Server以及MCP Tool的信息一起发送给 LLM。
2)LLM拿到信息后开始推理,基于用户的Query和MCP Server的信息,选出解决用户问题最合适的那个MCP Server和MCP Tool,然后返回给AIAgent(MCP Client)。这里LLM返回给AIAgent的信息是:“你用multiply这个 MCP Server里的Calculate_McpServer这个MCP Tool吧,它可以解决用户的问题”
3)AIAgent(MCP Client)现在知道该使用哪个MCP Server里的哪个MCP Tool了,直接调用那个MCP Tool,获取结果。调用Calculate_McpServer这个MCP Server里的multiply这个MCP Tool。
4)Calculate_McpServer 返回结果(计算乘法后的结果)给 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用户的问题和从Calculate_McpServer处拿到的结果再一次给了LLM,目的是让LLM结合问题和答案判断下是否要做上述的循环,如果没有则规整一下内容返回。
6)LLM把整理后的内容返回给AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的内容给用户。
需要注意的是不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。
Demo
下文用Python,Java,Golang编写上文时序图中的Calculate_McpServer的Demo,尝试LLM使用MCP Tool完成四则计算任务,由于HTTP+SSE已经被官方废弃,下文只使用streamable_http传输协议编写Demo。
MCPServer
from mcp.server.fastmcp import FastMCP
import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
MCP_SERVER_NAME = "Calculate_McpServer"
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服务器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="数学四则计算")
@mcp.tool(name="add", description="对两个数字进行加法")
def add(a: float, b: float) -> float:
"""
Add two numbers.
Parameters:
- a (float): First number (required)
- b (float): Second number (required)
Returns:
- float: The result of a + b
"""
return a + b
@mcp.tool(name="subtract", description="对两个数字进行减法")
def subtract(a: float, b: float) -> float:
"""
Subtract two numbers.
Parameters:
- a (float): The number to subtract from (required)
- b (float): The number to subtract (required)
Returns:
- float: The result of a - b
"""
return a - b
@mcp.tool(name="multiply", description="对两个数字进行乘法")
def multiply(a: float, b: float) -> float:
"""
Multiply two numbers.
Parameters:
- a (float): First number (required)
- b (float): Second number (required)
Returns:
- float: The result of a * b
"""
return a * b
@mcp.tool(name="divide", description="对两个数字进行除法")
def divide(a: float, b: float) -> float:
"""
Divide two numbers.
Parameters:
- a (float): Numerator (required)
- b (float): Denominator (required, must not be zero)
Returns:
- float: The result of a / b
"""
if b == 0:
raise ValueError("Division by zero is not allowed")
return a / b
if __name__ == "__main__":
mcp.settings.host = "0.0.0.0"
mcp.settings.port = 8000
mcp.settings.log_level = "INFO"
# stateless_http和json_response,两个参数默认都为False
# stateless_http
# 控制是否开启SSE通道和是否对对客户端会话进行管理
# json_response
# 控制Post请求响应结果数据结构是否用JSON还是SSE事件数据流(不是走SSE通道,只是用SSE事件数据格式)
# mcp.settings.json_response = True
# mcp.settings.stateless_http = True
# 初始化并运行服务器
print("Starting MCPServer...")
# mcp.run(transport='sse')
mcp.run(transport="streamable-http")
MCPClient
from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger
"""
官方MCP client demo:github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
"""
# 配置日志记录
logger = logger.setup_logging()
def convert_tools(tools: List[Tool]):
"""
将MCP Server的list tools获取到的工具列表,转换为OpenAI API的可用工具列表
"""
ret = []
for tool in tools:
parameters = {
"type": "object",
"properties": {},
"required": (
tool.inputSchema["required"] if "required" in tool.inputSchema else []
),
}
properties = tool.inputSchema["properties"]
for param_name in properties:
if "type" in properties[param_name]:
param_type = properties[param_name]["type"]
elif (
"anyOf" in properties[param_name]
and len(properties[param_name]["anyOf"]) > 0
):
param_type = properties[param_name]["anyOf"][0]["type"]
else:
param_type = "string"
parameters["properties"][param_name] = {
"type": param_type,
"description": properties[param_name].get("description", ""),
}
ret.append(
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": parameters,
},
}
)
return ret
class MCPClient:
def __init__(self):
# 初始化会话和客户端对象
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.stream_client = None # 修改为更通用的名称
async def connect_to_server(self, url):
"""Connect to an MCP server using Streamable HTTP.
Args:
url: Streamable HTTP 地址
"""
mcp_timeout = 300
logger.info(f"mcp_timeout: {mcp_timeout}")
# 使用 streamable_http_client 替代 sse_client
http_transport = await self.exit_stack.enter_async_context(
streamablehttp_client(url=url, timeout=mcp_timeout) # 修改为 streamable_http_client
)
self.streamable_http, self.write, _ = http_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.streamable_http, self.write)
)
logger.info(f"connect_to_server successful, streamable http url: {url}")
await self.session.initialize()
async def cleanup(self):
await self.exit_stack.aclose()
@classmethod
def list_tools(cls, mcp_server_url, reqid):
"""
列出指定MCP Server上可用的工具
Args:
mcp_server_url: MCP Server的URL
Returns:
可用的工具列表
"""
async def async_task():
client = cls()
await client.connect_to_server(mcp_server_url)
response = await client.session.list_tools()
mcp_tools = response.tools
available_tools = convert_tools(mcp_tools)
logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
# logger.info([tool["function"]["name"] for tool in available_tools])
await client.cleanup()
return available_tools
available_tools = asyncio.run(async_task())
return available_tools
@classmethod
def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
"""
调用指定MCP Server上的工具
Args:
mcp_server_url: MCP Server的URL
tool_name: 工具名称
tool_args: 工具参数
Returns:
工具调用结果
"""
async def async_task():
client = cls()
await client.connect_to_server(mcp_server_url)
logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
result = await client.session.call_tool(tool_name, tool_args)
logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
await client.cleanup()
return result
result = asyncio.run(async_task())
return result
if __name__ == "__main__":
mcpserver_url = "127.0.0.1:8000/mcp" # 修改为stream端点
# MCPClient.list_tools(mcpserver_url, "mock reqid")
MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")
LLMChat2MCP_FunctionCall
from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
logger = logger.setup_logging()
# load environment variables from .env
load_dotenv()
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
def chat_loop(mcp_server_url):
"""Run an interactive chat loop"""
logger.info("\nMCP Client Started!")
logger.info("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if not query: # 如果输入为空,直接跳过
continue
if query.lower() == "quit":
break
random_uuid = uuid.uuid4()
response = process_query(mcp_server_url, random_uuid, query)
logger.info(response)
except Exception as e:
logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
def process_query(mcp_server_url, reqid, query):
"""
处理查询,使用OpenAI API和MCP工具
Args:
mcp_server_url: MCP Server的URL
reqid: 请求ID
query: 用户查询
Returns:
处理结果
"""
available_tools = MCPClient.list_tools(mcp_server_url, reqid)
openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
messages = [{"role": "user", "content": query}]
try:
current_response = openai.chat.completions.create(
model=MODEL_NAME,
messages=messages,
tools=available_tools,
stream=False,
)
final_text = []
if current_response.choices[0].message.content:
final_result = current_response.choices[0].message.content
final_text.append(final_result)
# logger.info("AI:" + final_result)
# 处理返回的内容
content = current_response.choices[0]
# logger.info(
# "OpenAI Response JSON:\n%s",
# json.dumps(current_response.model_dump(), indent=4),
# )
if content.finish_reason == "tool_calls":
# 如果需要使用工具,解析工具调用
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
logger.info(callInfoStr)
# 执行工具
result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
final_text.append(callInfoStr)
# 将结果存入消息历史
# 检查 result 和 result.content 是否存在
tool_response = ""
if result and hasattr(result, "content") and result.content:
tool_response = result.content[0].text
else:
tool_response = "Tool returned empty or invalid response"
messages.append(content.message.model_dump())
messages.append(
{
"role": "tool",
"content": tool_response,
"tool_call_id": tool_call.id,
}
)
# 将结果返回给大模型生成最终响应
current_response = openai.chat.completions.create(
model=MODEL_NAME,
messages=messages,
tools=available_tools,
stream=False,
)
final_result = current_response.choices[0].message.content
final_text.append(final_result)
return "\n".join(final_text)
except Exception as e:
logger.error(
f"process_query Error processing query: {e}\n{traceback.format_exc()}"
)
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="mcp client call tool")
parser.add_argument(
"--url", required=True, help="Full URL, e.g. localhost:8114/sse"
)
args = parser.parse_args()
# query = "你是谁?"
# result = process_query("localhost:8114/sse", "mock reqid", query)
# logger.info(f"Query result: {result}")
chat_loop(args.url)
LLMChat2MCP_SystemPrompt
from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re
logger = logger.setup_logging()
# 加载环境变量
load_dotenv()
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3 # 最大工具调用次数,防止无限循环
def build_system_prompt(available_tools):
"""构建系统提示词,描述所有可用的MCP工具"""
tools_desc = []
# 解析工具列表并生成描述
for tool in available_tools:
func = tool["function"]
name = func["name"]
desc = func["description"]
params = func["parameters"]["properties"]
# 生成参数描述
param_desc = []
for param_name, param_info in params.items():
param_desc.append(
f"{param_name} ({param_info.get('type', 'string')}): "
f"{param_info.get('description', 'No description')}"
)
tools_desc.append(
f"工具名称: {name}\n"
f"描述: {desc}\n"
f"参数: {', '.join(param_desc)}\n"
"---"
)
# 构建完整的系统提示词
return (
"你是一个智能助手,可以使用以下工具解决问题。当用户请求需要工具时,"
"请严格按以下格式响应:\n"
'TOOL_CALL: {"tool": "工具名称", "arguments": {"参数1": "值1", ...}}\n\n'
"可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要规则:\n"
"1. 只有在需要时才调用工具\n"
"2. 响应必须包含TOOL_CALL: 前缀\n"
"3. 不要解释工具调用,只需输出JSON\n"
"4. 如果不需要工具,直接回答用户问题"
)
def extract_tool_call(response_content):
"""从LLM响应中提取工具调用信息"""
# 使用正则表达式匹配TOOL_CALL: {...} 格式
pattern = r'TOOL_CALL:\s*(\{.*\})'
match = re.search(pattern, response_content, re.DOTALL)
if not match:
return None
jsonStr=match.group(1)
try:
tool_call = json.loads(jsonStr)
return tool_call
except json.JSONDecodeError:
logger.error(f"JSON解析错误: {jsonStr}")
return None
def process_query(mcp_server_url, reqid, query):
"""
处理查询,使用OpenAI API和MCP工具(系统提示词方式)
"""
# 获取可用工具列表
available_tools = MCPClient.list_tools(mcp_server_url, reqid)
# 构建系统提示词
system_prompt = build_system_prompt(available_tools)
openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
]
tool_calls_count = 0
final_response = None
while tool_calls_count < MAX_TOOL_CALLS:
try:
# 调用OpenAI API
response = openai.chat.completions.create(
model=MODEL_NAME,
messages=messages,
stream=False,
)
# 获取响应内容
response_content = response.choices[0].message.content
# logger.info(f"LLM原始响应: {response_content}")
# 检查是否需要调用工具
tool_call = extract_tool_call(response_content)
if tool_call:
# 处理工具调用
tool_name = tool_call["tool"]
tool_args = tool_call["arguments"]
call_info = f"[调用工具 {tool_name} 参数: {tool_args}]"
logger.info(call_info)
# 执行工具调用
result = MCPClient.call_tool(
mcp_server_url, tool_name, tool_args, reqid
)
# 处理工具响应
tool_response = ""
if result and hasattr(result, "content") and result.content:
tool_response = result.content[0].text
else:
tool_response = "工具返回空响应或无效响应"
logger.info(f"工具响应: {tool_response}")
# 添加到消息历史
messages.append({"role": "assistant", "content": response_content})
messages.append(
{
"role": "user",
"content": f"工具调用结果: {tool_response}\n\n请根据此结果回答用户问题",
}
)
tool_calls_count += 1
else:
# 没有工具调用,直接返回结果
final_response = response_content
break
except Exception as e:
logger.error(f"处理错误: {e}\n{traceback.format_exc()}")
final_response = "处理查询时出错"
break
# 如果达到最大调用次数仍未获得最终响应
if not final_response:
final_response = "已达到最大工具调用次数。" "最后响应: " + response_content
return final_response
def chat_loop(mcp_server_url):
"""Run an interactive chat loop"""
logger.info("\nMCP Client Started!")
logger.info("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if not query: # 如果输入为空,直接跳过
continue
if query.lower() == "quit":
break
random_uuid = uuid.uuid4()
response = process_query(mcp_server_url, random_uuid, query)
logger.info(response)
except Exception as e:
logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
# chat_loop 和 __main__ 部分保持不变(与原始代码相同)
if __name__ == "__main__":
# tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
# logger.info(tool_call)
parser = argparse.ArgumentParser(description="mcp client call tool")
parser.add_argument(
"--url", required=True, help="Full URL, e.g. localhost:8114/sse"
)
args = parser.parse_args()
# query = "你是谁?"
# result = process_query("localhost:8114/sse", "mock reqid", query)
# logger.info(f"Query result: {result}")
chat_loop(args.url)
总结
1,MCP HTTP+SSE传输方式官方已经废弃,后续主要使用StreamableHTTP传输方式。
2,MCP StreamableHTTP提供了stateless_http和json_response两个重要参数细化对的AI不同场景处理能力。
3,不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。
4,通过上述代码可知,在AIAgent的整个调用流程中,LLM大模型只做推理,真正的ToolCall还是AIAgent角色,LLM大模型会返回需要调用的MCPServer、调用的Tool、调用的参数给到AIAgent,AIAgent在使用MCPClient调用MCPServer,在将返回的结果给到LLM做下一步的推理,重复上述过程,直到LLM认为任务结束。