searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

【MCP-03】一次完整的MCP和LLM交互流程

2025-08-19 10:32:01
1
0

前言

从[SpringAI MCP技术试用]到[【01】JSON-RPC2.0协议],[【02】SSE和StreamableHttp技术整理]这几篇总结大概说明了下MCP出现的原因,以及简单说明了下技术细节,但是MCP还是主要为LLM大模型服务的,那MCP和LLM大模型是怎么交互的呢?这里简单总结下交互流程。

整体流程

企业微信截图_b2eb3720-59d8-4390-bdf1-363573f56018.png
企业微信截图_af8aa055-e815-4c23-9671-e5168c6fb90f.png

一些说明:
1)用户向AIAgent问“帮我计算3*111”,此时AIAgent就是MCP Client,MCP Client会先拉取所有注册到AIAgent的MCPServer元数据信息,然后把用户Query和所有的MCP Server以及MCP Tool的信息一起发送给 LLM。
2)LLM拿到信息后开始推理,基于用户的Query和MCP Server的信息,选出解决用户问题最合适的那个MCP Server和MCP Tool,然后返回给AIAgent(MCP Client)。这里LLM返回给AIAgent的信息是:“你用multiply这个 MCP Server里的Calculate_McpServer这个MCP Tool吧,它可以解决用户的问题”
3)AIAgent(MCP Client)现在知道该使用哪个MCP Server里的哪个MCP Tool了,直接调用那个MCP Tool,获取结果。调用Calculate_McpServer这个MCP Server里的multiply这个MCP Tool。
4)Calculate_McpServer 返回结果(计算乘法后的结果)给 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用户的问题和从Calculate_McpServer处拿到的结果再一次给了LLM,目的是让LLM结合问题和答案判断下是否要做上述的循环,如果没有则规整一下内容返回。
6)LLM把整理后的内容返回给AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的内容给用户。
需要注意的是不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。

Demo

下文用Python,Java,Golang编写上文时序图中的Calculate_McpServer的Demo,尝试LLM使用MCP Tool完成四则计算任务,由于HTTP+SSE已经被官方废弃,下文只使用streamable_http传输协议编写Demo。

MCPServer

from mcp.server.fastmcp import FastMCP
import logging
 
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
 
MCP_SERVER_NAME = "Calculate_McpServer"
 
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服务器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="数学四则计算")
 
@mcp.tool(name="add", description="对两个数字进行加法")
def add(a: float, b: float) -> float:
    """
    Add two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a + b
    """
    return a + b
 
@mcp.tool(name="subtract", description="对两个数字进行减法")
def subtract(a: float, b: float) -> float:
    """
    Subtract two numbers.
 
    Parameters:
    - a (float): The number to subtract from (required)
    - b (float): The number to subtract (required)
 
    Returns:
    - float: The result of a - b
    """
    return a - b
 
@mcp.tool(name="multiply", description="对两个数字进行乘法")
def multiply(a: float, b: float) -> float:
    """
    Multiply two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a * b
    """
    return a * b
 
@mcp.tool(name="divide", description="对两个数字进行除法")
def divide(a: float, b: float) -> float:
    """
    Divide two numbers.
 
    Parameters:
    - a (float): Numerator (required)
    - b (float): Denominator (required, must not be zero)
 
    Returns:
    - float: The result of a / b
    """
    if b == 0:
        raise ValueError("Division by zero is not allowed")
    return a / b
 
if __name__ == "__main__":
    mcp.settings.host = "0.0.0.0"
    mcp.settings.port = 8000
    mcp.settings.log_level = "INFO"
 
    # stateless_http和json_response,两个参数默认都为False
    # stateless_http
    # 控制是否开启SSE通道和是否对对客户端会话进行管理
    # json_response
    # 控制Post请求响应结果数据结构是否用JSON还是SSE事件数据流(不是走SSE通道,只是用SSE事件数据格式)
    # mcp.settings.json_response = True
    # mcp.settings.stateless_http = True
     
    # 初始化并运行服务器
    print("Starting MCPServer...")
    # mcp.run(transport='sse')
    mcp.run(transport="streamable-http")

MCPClient

from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger
 
"""
官方MCP client demo:github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
 """
 
# 配置日志记录
logger = logger.setup_logging()
 
 
def convert_tools(tools: List[Tool]):
    """
    将MCP Server的list tools获取到的工具列表,转换为OpenAI API的可用工具列表
    """
    ret = []
    for tool in tools:
        parameters = {
            "type": "object",
            "properties": {},
            "required": (
                tool.inputSchema["required"] if "required" in tool.inputSchema else []
            ),
        }
        properties = tool.inputSchema["properties"]
        for param_name in properties:
            if "type" in properties[param_name]:
                param_type = properties[param_name]["type"]
            elif (
                "anyOf" in properties[param_name]
                and len(properties[param_name]["anyOf"]) > 0
            ):
                param_type = properties[param_name]["anyOf"][0]["type"]
            else:
                param_type = "string"
            parameters["properties"][param_name] = {
                "type": param_type,
                "description": properties[param_name].get("description", ""),
            }
 
        ret.append(
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": parameters,
                },
            }
        )
    return ret
 
class MCPClient:
    def __init__(self):
        # 初始化会话和客户端对象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.stream_client = None  # 修改为更通用的名称
 
    async def connect_to_server(self, url):
        """Connect to an MCP server using Streamable HTTP.
        Args:
            url: Streamable HTTP 地址
        """
        mcp_timeout = 300
        logger.info(f"mcp_timeout: {mcp_timeout}")
 
        # 使用 streamable_http_client 替代 sse_client
        http_transport = await self.exit_stack.enter_async_context(
            streamablehttp_client(url=url, timeout=mcp_timeout)  # 修改为 streamable_http_client
        )
        self.streamable_http, self.write, _ = http_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.streamable_http, self.write)
        )
 
        logger.info(f"connect_to_server successful, streamable http url: {url}")
        await self.session.initialize()
 
    async def cleanup(self):
        await self.exit_stack.aclose()
 
    @classmethod
    def list_tools(cls, mcp_server_url, reqid):
        """
        列出指定MCP Server上可用的工具
        Args:
            mcp_server_url: MCP Server的URL
        Returns:
            可用的工具列表
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            response = await client.session.list_tools()
            mcp_tools = response.tools
            available_tools = convert_tools(mcp_tools)
            logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
            # logger.info([tool["function"]["name"] for tool in available_tools])
            await client.cleanup()
            return available_tools
 
        available_tools = asyncio.run(async_task())
        return available_tools
 
    @classmethod
    def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
        """
        调用指定MCP Server上的工具
        Args:
            mcp_server_url: MCP Server的URL
            tool_name: 工具名称
            tool_args: 工具参数
        Returns:
            工具调用结果
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
            result = await client.session.call_tool(tool_name, tool_args)
            logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
            await client.cleanup()
            return result
 
        result = asyncio.run(async_task())
        return result
 
if __name__ == "__main__":
    mcpserver_url = "127.0.0.1:8000/mcp"  # 修改为stream端点
    # MCPClient.list_tools(mcpserver_url, "mock reqid")
    MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")

LLMChat2MCP_FunctionCall

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
 
logger = logger.setup_logging()
 
# load environment variables from .env
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果输入为空,直接跳过
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
def process_query(mcp_server_url, reqid, query):
    """
    处理查询,使用OpenAI API和MCP工具
    Args:
        mcp_server_url: MCP Server的URL
        reqid: 请求ID
        query: 用户查询
    Returns:
        处理结果
    """
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
 
    messages = [{"role": "user", "content": query}]
 
    try:
        current_response = openai.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            tools=available_tools,
            stream=False,
        )
 
        final_text = []
 
        if current_response.choices[0].message.content:
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
            # logger.info("AI:" + final_result)
 
        # 处理返回的内容
        content = current_response.choices[0]
        # logger.info(
        #     "OpenAI Response JSON:\n%s",
        #     json.dumps(current_response.model_dump(), indent=4),
        # )
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具调用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
            logger.info(callInfoStr)
            # 执行工具
            result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
            final_text.append(callInfoStr)
 
            # 将结果存入消息历史
            # 检查 result 和 result.content 是否存在
            tool_response = ""
            if result and hasattr(result, "content") and result.content:
                tool_response = result.content[0].text
            else:
                tool_response = "Tool returned empty or invalid response"
 
            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": tool_response,
                    "tool_call_id": tool_call.id,
                }
            )
 
            # 将结果返回给大模型生成最终响应
            current_response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                tools=available_tools,
                stream=False,
            )
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
        return "\n".join(final_text)
    except Exception as e:
        logger.error(
            f"process_query Error processing query: {e}\n{traceback.format_exc()}"
        )
        return None
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是谁?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

LLMChat2MCP_SystemPrompt

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re
 
logger = logger.setup_logging()
 
# 加载环境变量
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3  # 最大工具调用次数,防止无限循环
 
 
def build_system_prompt(available_tools):
    """构建系统提示词,描述所有可用的MCP工具"""
    tools_desc = []
 
    # 解析工具列表并生成描述
    for tool in available_tools:
        func = tool["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"]["properties"]
 
        # 生成参数描述
        param_desc = []
        for param_name, param_info in params.items():
            param_desc.append(
                f"{param_name} ({param_info.get('type', 'string')}): "
                f"{param_info.get('description', 'No description')}"
            )
 
        tools_desc.append(
            f"工具名称: {name}\n"
            f"描述: {desc}\n"
            f"参数: {', '.join(param_desc)}\n"
            "---"
        )
 
    # 构建完整的系统提示词
    return (
        "你是一个智能助手,可以使用以下工具解决问题。当用户请求需要工具时,"
        "请严格按以下格式响应:\n"
        'TOOL_CALL: {"tool": "工具名称", "arguments": {"参数1": "值1", ...}}\n\n'
        "可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要规则:\n"
        "1. 只有在需要时才调用工具\n"
        "2. 响应必须包含TOOL_CALL: 前缀\n"
        "3. 不要解释工具调用,只需输出JSON\n"
        "4. 如果不需要工具,直接回答用户问题"
    )
 
 
def extract_tool_call(response_content):
    """从LLM响应中提取工具调用信息"""
    # 使用正则表达式匹配TOOL_CALL: {...} 格式
    pattern = r'TOOL_CALL:\s*(\{.*\})'
    match = re.search(pattern, response_content, re.DOTALL)
    if not match:
        return None
    jsonStr=match.group(1)
    try:
        tool_call = json.loads(jsonStr)
        return tool_call
    except json.JSONDecodeError:
        logger.error(f"JSON解析错误: {jsonStr}")
        return None
 
 
def process_query(mcp_server_url, reqid, query):
    """
    处理查询,使用OpenAI API和MCP工具(系统提示词方式)
    """
    # 获取可用工具列表
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
 
    # 构建系统提示词
    system_prompt = build_system_prompt(available_tools)
 
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]
 
    tool_calls_count = 0
    final_response = None
 
    while tool_calls_count < MAX_TOOL_CALLS:
        try:
            # 调用OpenAI API
            response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                stream=False,
            )
 
            # 获取响应内容
            response_content = response.choices[0].message.content
            # logger.info(f"LLM原始响应: {response_content}")
 
            # 检查是否需要调用工具
            tool_call = extract_tool_call(response_content)
 
            if tool_call:
                # 处理工具调用
                tool_name = tool_call["tool"]
                tool_args = tool_call["arguments"]
 
                call_info = f"[调用工具 {tool_name} 参数: {tool_args}]"
                logger.info(call_info)
 
                # 执行工具调用
                result = MCPClient.call_tool(
                    mcp_server_url, tool_name, tool_args, reqid
                )
 
                # 处理工具响应
                tool_response = ""
                if result and hasattr(result, "content") and result.content:
                    tool_response = result.content[0].text
                else:
                    tool_response = "工具返回空响应或无效响应"
 
                logger.info(f"工具响应: {tool_response}")
 
                # 添加到消息历史
                messages.append({"role": "assistant", "content": response_content})
                messages.append(
                    {
                        "role": "user",
                        "content": f"工具调用结果: {tool_response}\n\n请根据此结果回答用户问题",
                    }
                )
 
                tool_calls_count += 1
            else:
                # 没有工具调用,直接返回结果
                final_response = response_content
                break
 
        except Exception as e:
            logger.error(f"处理错误: {e}\n{traceback.format_exc()}")
            final_response = "处理查询时出错"
            break
 
    # 如果达到最大调用次数仍未获得最终响应
    if not final_response:
        final_response = "已达到最大工具调用次数。" "最后响应: " + response_content
 
    return final_response
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果输入为空,直接跳过
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
# chat_loop 和 __main__ 部分保持不变(与原始代码相同)
if __name__ == "__main__":
#    tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
#    logger.info(tool_call)
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是谁?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

企业微信截图_769c503d-1943-4049-bed5-e5dba05d9183.png

企业微信截图_9075ba78-e1b1-4cec-8c50-26d68bf801d2.png

总结

1,MCP HTTP+SSE传输方式官方已经废弃,后续主要使用StreamableHTTP传输方式。
2,MCP StreamableHTTP提供了stateless_http和json_response两个重要参数细化对的AI不同场景处理能力。
3,不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。
4,通过上述代码可知,在AIAgent的整个调用流程中,LLM大模型只做推理,真正的ToolCall还是AIAgent角色,LLM大模型会返回需要调用的MCPServer、调用的Tool、调用的参数给到AIAgent,AIAgent在使用MCPClient调用MCPServer,在将返回的结果给到LLM做下一步的推理,重复上述过程,直到LLM认为任务结束。

0条评论
0 / 1000
wanghg11
21文章数
4粉丝数
wanghg11
21 文章 | 4 粉丝
原创

【MCP-03】一次完整的MCP和LLM交互流程

2025-08-19 10:32:01
1
0

前言

从[SpringAI MCP技术试用]到[【01】JSON-RPC2.0协议],[【02】SSE和StreamableHttp技术整理]这几篇总结大概说明了下MCP出现的原因,以及简单说明了下技术细节,但是MCP还是主要为LLM大模型服务的,那MCP和LLM大模型是怎么交互的呢?这里简单总结下交互流程。

整体流程

企业微信截图_b2eb3720-59d8-4390-bdf1-363573f56018.png
企业微信截图_af8aa055-e815-4c23-9671-e5168c6fb90f.png

一些说明:
1)用户向AIAgent问“帮我计算3*111”,此时AIAgent就是MCP Client,MCP Client会先拉取所有注册到AIAgent的MCPServer元数据信息,然后把用户Query和所有的MCP Server以及MCP Tool的信息一起发送给 LLM。
2)LLM拿到信息后开始推理,基于用户的Query和MCP Server的信息,选出解决用户问题最合适的那个MCP Server和MCP Tool,然后返回给AIAgent(MCP Client)。这里LLM返回给AIAgent的信息是:“你用multiply这个 MCP Server里的Calculate_McpServer这个MCP Tool吧,它可以解决用户的问题”
3)AIAgent(MCP Client)现在知道该使用哪个MCP Server里的哪个MCP Tool了,直接调用那个MCP Tool,获取结果。调用Calculate_McpServer这个MCP Server里的multiply这个MCP Tool。
4)Calculate_McpServer 返回结果(计算乘法后的结果)给 AIAgent(MCP Client)。
5)AIAgent(MCP Client)把用户的问题和从Calculate_McpServer处拿到的结果再一次给了LLM,目的是让LLM结合问题和答案判断下是否要做上述的循环,如果没有则规整一下内容返回。
6)LLM把整理后的内容返回给AIAgent(MCP Client),最后AIAgent(MCP Client)再返回整合后的内容给用户。
需要注意的是不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。

Demo

下文用Python,Java,Golang编写上文时序图中的Calculate_McpServer的Demo,尝试LLM使用MCP Tool完成四则计算任务,由于HTTP+SSE已经被官方废弃,下文只使用streamable_http传输协议编写Demo。

MCPServer

from mcp.server.fastmcp import FastMCP
import logging
 
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
 
MCP_SERVER_NAME = "Calculate_McpServer"
 
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化FastMCP服务器
mcp = FastMCP(name=MCP_SERVER_NAME,instructions="数学四则计算")
 
@mcp.tool(name="add", description="对两个数字进行加法")
def add(a: float, b: float) -> float:
    """
    Add two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a + b
    """
    return a + b
 
@mcp.tool(name="subtract", description="对两个数字进行减法")
def subtract(a: float, b: float) -> float:
    """
    Subtract two numbers.
 
    Parameters:
    - a (float): The number to subtract from (required)
    - b (float): The number to subtract (required)
 
    Returns:
    - float: The result of a - b
    """
    return a - b
 
@mcp.tool(name="multiply", description="对两个数字进行乘法")
def multiply(a: float, b: float) -> float:
    """
    Multiply two numbers.
 
    Parameters:
    - a (float): First number (required)
    - b (float): Second number (required)
 
    Returns:
    - float: The result of a * b
    """
    return a * b
 
@mcp.tool(name="divide", description="对两个数字进行除法")
def divide(a: float, b: float) -> float:
    """
    Divide two numbers.
 
    Parameters:
    - a (float): Numerator (required)
    - b (float): Denominator (required, must not be zero)
 
    Returns:
    - float: The result of a / b
    """
    if b == 0:
        raise ValueError("Division by zero is not allowed")
    return a / b
 
if __name__ == "__main__":
    mcp.settings.host = "0.0.0.0"
    mcp.settings.port = 8000
    mcp.settings.log_level = "INFO"
 
    # stateless_http和json_response,两个参数默认都为False
    # stateless_http
    # 控制是否开启SSE通道和是否对对客户端会话进行管理
    # json_response
    # 控制Post请求响应结果数据结构是否用JSON还是SSE事件数据流(不是走SSE通道,只是用SSE事件数据格式)
    # mcp.settings.json_response = True
    # mcp.settings.stateless_http = True
     
    # 初始化并运行服务器
    print("Starting MCPServer...")
    # mcp.run(transport='sse')
    mcp.run(transport="streamable-http")

MCPClient

from typing import Optional
import asyncio
from contextlib import AsyncExitStack
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession, Tool
from typing import Optional, List
from utils import logger
 
"""
官方MCP client demo:github.com/modelcontextprotocol/quickstart-resources/blob/main/mcp-client-python/client.py
 """
 
# 配置日志记录
logger = logger.setup_logging()
 
 
def convert_tools(tools: List[Tool]):
    """
    将MCP Server的list tools获取到的工具列表,转换为OpenAI API的可用工具列表
    """
    ret = []
    for tool in tools:
        parameters = {
            "type": "object",
            "properties": {},
            "required": (
                tool.inputSchema["required"] if "required" in tool.inputSchema else []
            ),
        }
        properties = tool.inputSchema["properties"]
        for param_name in properties:
            if "type" in properties[param_name]:
                param_type = properties[param_name]["type"]
            elif (
                "anyOf" in properties[param_name]
                and len(properties[param_name]["anyOf"]) > 0
            ):
                param_type = properties[param_name]["anyOf"][0]["type"]
            else:
                param_type = "string"
            parameters["properties"][param_name] = {
                "type": param_type,
                "description": properties[param_name].get("description", ""),
            }
 
        ret.append(
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": parameters,
                },
            }
        )
    return ret
 
class MCPClient:
    def __init__(self):
        # 初始化会话和客户端对象
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.stream_client = None  # 修改为更通用的名称
 
    async def connect_to_server(self, url):
        """Connect to an MCP server using Streamable HTTP.
        Args:
            url: Streamable HTTP 地址
        """
        mcp_timeout = 300
        logger.info(f"mcp_timeout: {mcp_timeout}")
 
        # 使用 streamable_http_client 替代 sse_client
        http_transport = await self.exit_stack.enter_async_context(
            streamablehttp_client(url=url, timeout=mcp_timeout)  # 修改为 streamable_http_client
        )
        self.streamable_http, self.write, _ = http_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.streamable_http, self.write)
        )
 
        logger.info(f"connect_to_server successful, streamable http url: {url}")
        await self.session.initialize()
 
    async def cleanup(self):
        await self.exit_stack.aclose()
 
    @classmethod
    def list_tools(cls, mcp_server_url, reqid):
        """
        列出指定MCP Server上可用的工具
        Args:
            mcp_server_url: MCP Server的URL
        Returns:
            可用的工具列表
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            response = await client.session.list_tools()
            mcp_tools = response.tools
            available_tools = convert_tools(mcp_tools)
            logger.info(f"reqid:{reqid},tool size:{len(available_tools)},tool name:{",".join([tool["function"]["name"] for tool in available_tools])}")
            # logger.info([tool["function"]["name"] for tool in available_tools])
            await client.cleanup()
            return available_tools
 
        available_tools = asyncio.run(async_task())
        return available_tools
 
    @classmethod
    def call_tool(cls, mcp_server_url: str, tool_name: str, tool_args: dict, reqid):
        """
        调用指定MCP Server上的工具
        Args:
            mcp_server_url: MCP Server的URL
            tool_name: 工具名称
            tool_args: 工具参数
        Returns:
            工具调用结果
        """
        async def async_task():
            client = cls()
            await client.connect_to_server(mcp_server_url)
            logger.info(f"reqid;{reqid}, mcp client, tool_name: {tool_name}, tool_args: {tool_args}")
            result = await client.session.call_tool(tool_name, tool_args)
            logger.info(f"reqid;{reqid}, mcp client, tool_args: {tool_args}, result: {result}")
            await client.cleanup()
            return result
 
        result = asyncio.run(async_task())
        return result
 
if __name__ == "__main__":
    mcpserver_url = "127.0.0.1:8000/mcp"  # 修改为stream端点
    # MCPClient.list_tools(mcpserver_url, "mock reqid")
    MCPClient.call_tool(mcpserver_url,"multiply",{"a": 3, "b": 111},"mock reqid")

LLMChat2MCP_FunctionCall

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
 
logger = logger.setup_logging()
 
# load environment variables from .env
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果输入为空,直接跳过
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
def process_query(mcp_server_url, reqid, query):
    """
    处理查询,使用OpenAI API和MCP工具
    Args:
        mcp_server_url: MCP Server的URL
        reqid: 请求ID
        query: 用户查询
    Returns:
        处理结果
    """
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
 
    messages = [{"role": "user", "content": query}]
 
    try:
        current_response = openai.chat.completions.create(
            model=MODEL_NAME,
            messages=messages,
            tools=available_tools,
            stream=False,
        )
 
        final_text = []
 
        if current_response.choices[0].message.content:
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
            # logger.info("AI:" + final_result)
 
        # 处理返回的内容
        content = current_response.choices[0]
        # logger.info(
        #     "OpenAI Response JSON:\n%s",
        #     json.dumps(current_response.model_dump(), indent=4),
        # )
        if content.finish_reason == "tool_calls":
            # 如果需要使用工具,解析工具调用
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            callInfoStr = f"[Calling tool {tool_name} with args {tool_args}]"
            logger.info(callInfoStr)
            # 执行工具
            result = MCPClient.call_tool(mcp_server_url, tool_name, tool_args, "LLMreqId1")
            final_text.append(callInfoStr)
 
            # 将结果存入消息历史
            # 检查 result 和 result.content 是否存在
            tool_response = ""
            if result and hasattr(result, "content") and result.content:
                tool_response = result.content[0].text
            else:
                tool_response = "Tool returned empty or invalid response"
 
            messages.append(content.message.model_dump())
            messages.append(
                {
                    "role": "tool",
                    "content": tool_response,
                    "tool_call_id": tool_call.id,
                }
            )
 
            # 将结果返回给大模型生成最终响应
            current_response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                tools=available_tools,
                stream=False,
            )
            final_result = current_response.choices[0].message.content
            final_text.append(final_result)
        return "\n".join(final_text)
    except Exception as e:
        logger.error(
            f"process_query Error processing query: {e}\n{traceback.format_exc()}"
        )
        return None
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是谁?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

LLMChat2MCP_SystemPrompt

from openai import OpenAI
from utils import logger
from dotenv import load_dotenv
import os
from streamablehttp_mcpclient import MCPClient
import json
import uuid
import argparse
import traceback
import re
 
logger = logger.setup_logging()
 
# 加载环境变量
load_dotenv()
 
API_KEY = os.environ["API_KEY"]
BASE_URL = os.environ["BASE_URL"]
MODEL_NAME = os.environ["MODEL_NAME"]
MAX_TOOL_CALLS = 3  # 最大工具调用次数,防止无限循环
 
 
def build_system_prompt(available_tools):
    """构建系统提示词,描述所有可用的MCP工具"""
    tools_desc = []
 
    # 解析工具列表并生成描述
    for tool in available_tools:
        func = tool["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"]["properties"]
 
        # 生成参数描述
        param_desc = []
        for param_name, param_info in params.items():
            param_desc.append(
                f"{param_name} ({param_info.get('type', 'string')}): "
                f"{param_info.get('description', 'No description')}"
            )
 
        tools_desc.append(
            f"工具名称: {name}\n"
            f"描述: {desc}\n"
            f"参数: {', '.join(param_desc)}\n"
            "---"
        )
 
    # 构建完整的系统提示词
    return (
        "你是一个智能助手,可以使用以下工具解决问题。当用户请求需要工具时,"
        "请严格按以下格式响应:\n"
        'TOOL_CALL: {"tool": "工具名称", "arguments": {"参数1": "值1", ...}}\n\n'
        "可用工具列表:\n" + "\n".join(tools_desc) + "\n\n" + "重要规则:\n"
        "1. 只有在需要时才调用工具\n"
        "2. 响应必须包含TOOL_CALL: 前缀\n"
        "3. 不要解释工具调用,只需输出JSON\n"
        "4. 如果不需要工具,直接回答用户问题"
    )
 
 
def extract_tool_call(response_content):
    """从LLM响应中提取工具调用信息"""
    # 使用正则表达式匹配TOOL_CALL: {...} 格式
    pattern = r'TOOL_CALL:\s*(\{.*\})'
    match = re.search(pattern, response_content, re.DOTALL)
    if not match:
        return None
    jsonStr=match.group(1)
    try:
        tool_call = json.loads(jsonStr)
        return tool_call
    except json.JSONDecodeError:
        logger.error(f"JSON解析错误: {jsonStr}")
        return None
 
 
def process_query(mcp_server_url, reqid, query):
    """
    处理查询,使用OpenAI API和MCP工具(系统提示词方式)
    """
    # 获取可用工具列表
    available_tools = MCPClient.list_tools(mcp_server_url, reqid)
 
    # 构建系统提示词
    system_prompt = build_system_prompt(available_tools)
 
    openai = OpenAI(api_key=API_KEY, base_url=BASE_URL)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]
 
    tool_calls_count = 0
    final_response = None
 
    while tool_calls_count < MAX_TOOL_CALLS:
        try:
            # 调用OpenAI API
            response = openai.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                stream=False,
            )
 
            # 获取响应内容
            response_content = response.choices[0].message.content
            # logger.info(f"LLM原始响应: {response_content}")
 
            # 检查是否需要调用工具
            tool_call = extract_tool_call(response_content)
 
            if tool_call:
                # 处理工具调用
                tool_name = tool_call["tool"]
                tool_args = tool_call["arguments"]
 
                call_info = f"[调用工具 {tool_name} 参数: {tool_args}]"
                logger.info(call_info)
 
                # 执行工具调用
                result = MCPClient.call_tool(
                    mcp_server_url, tool_name, tool_args, reqid
                )
 
                # 处理工具响应
                tool_response = ""
                if result and hasattr(result, "content") and result.content:
                    tool_response = result.content[0].text
                else:
                    tool_response = "工具返回空响应或无效响应"
 
                logger.info(f"工具响应: {tool_response}")
 
                # 添加到消息历史
                messages.append({"role": "assistant", "content": response_content})
                messages.append(
                    {
                        "role": "user",
                        "content": f"工具调用结果: {tool_response}\n\n请根据此结果回答用户问题",
                    }
                )
 
                tool_calls_count += 1
            else:
                # 没有工具调用,直接返回结果
                final_response = response_content
                break
 
        except Exception as e:
            logger.error(f"处理错误: {e}\n{traceback.format_exc()}")
            final_response = "处理查询时出错"
            break
 
    # 如果达到最大调用次数仍未获得最终响应
    if not final_response:
        final_response = "已达到最大工具调用次数。" "最后响应: " + response_content
 
    return final_response
 
 
def chat_loop(mcp_server_url):
    """Run an interactive chat loop"""
    logger.info("\nMCP Client Started!")
    logger.info("Type your queries or 'quit' to exit.")
 
    while True:
        try:
            query = input("\nQuery: ").strip()
            if not query:  # 如果输入为空,直接跳过
                continue
            if query.lower() == "quit":
                break
            random_uuid = uuid.uuid4()
            response = process_query(mcp_server_url, random_uuid, query)
            logger.info(response)
        except Exception as e:
            logger.error(f"\nchat_loop Error: {e}\n{traceback.format_exc()}")
 
 
# chat_loop 和 __main__ 部分保持不变(与原始代码相同)
if __name__ == "__main__":
#    tool_call= extract_tool_call('TOOL_CALL: {"tool": "multiply", "arguments": {"a": 3, "b": 77}}')
#    logger.info(tool_call)
    parser = argparse.ArgumentParser(description="mcp client call tool")
    parser.add_argument(
        "--url", required=True, help="Full URL, e.g. localhost:8114/sse"
    )
    args = parser.parse_args()
    # query = "你是谁?"
    # result = process_query("localhost:8114/sse", "mock reqid", query)
    # logger.info(f"Query result: {result}")
    chat_loop(args.url)

企业微信截图_769c503d-1943-4049-bed5-e5dba05d9183.png

企业微信截图_9075ba78-e1b1-4cec-8c50-26d68bf801d2.png

总结

1,MCP HTTP+SSE传输方式官方已经废弃,后续主要使用StreamableHTTP传输方式。
2,MCP StreamableHTTP提供了stateless_http和json_response两个重要参数细化对的AI不同场景处理能力。
3,不是所有的大模型都支持FunctionCall的方式,还有一种通用的方式是走系统提示词(system prompt),大体思路和FunctionCall差不多,只是系统提示词(system prompt)需要LLM打模型按指定的格式返回MCPServer的名称Tool以及args参数。
4,通过上述代码可知,在AIAgent的整个调用流程中,LLM大模型只做推理,真正的ToolCall还是AIAgent角色,LLM大模型会返回需要调用的MCPServer、调用的Tool、调用的参数给到AIAgent,AIAgent在使用MCPClient调用MCPServer,在将返回的结果给到LLM做下一步的推理,重复上述过程,直到LLM认为任务结束。

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0