前言
上文[【MCP-03】一次完整的MCP和LLM交互流程]使用Python示例了一次完整的MCP和LLM交互流程,下文使用Golang编写一个基于MCP StreamableHTTP协议的Demo。
Demo
Golang版本只实现走FunctionCall的方式,走系统提示词(system prompt)思路差不多,参考[Python版本]即可。golang的MCPServer参考mcp-go的[examples]。
ShMcpServer
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func NewMCPHttpServer() *server.MCPServer {
// Create a new MCP server
s := server.NewMCPServer(
"Calculate_McpServer",
"1.0.0",
server.WithToolCapabilities(true),
server.WithLogging(),
)
// Add a calculator tool
calculatorTool := mcp.NewTool("calculate",
mcp.WithDescription("数学四则计算"),
mcp.WithString("operation",
mcp.Required(),
mcp.Description("The operation to perform (add, subtract, multiply, divide)"),
mcp.Enum("add", "subtract", "multiply", "divide"),
),
mcp.WithNumber("x",
mcp.Required(),
mcp.Description("First number"),
),
mcp.WithNumber("y",
mcp.Required(),
mcp.Description("Second number"),
),
)
// Add the calculator handler
s.AddTool(calculatorTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Using helper functions for type-safe argument access
op, err := request.RequireString("operation")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
x, err := request.RequireFloat("x")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
y, err := request.RequireFloat("y")
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
var result float64
switch op {
case "add":
result = x + y
case "subtract":
result = x - y
case "multiply":
result = x * y
case "divide":
if y == 0 {
return mcp.NewToolResultError("cannot divide by zero"), nil
}
result = x / y
}
valR := fmt.Sprintf("%.2f", result)
log.Printf("calculator x:%f op:%s y:%f = result:%f\n", x, op, y, result)
return mcp.NewToolResultText(valR), nil
})
return s
}
func main() {
s := NewMCPHttpServer()
// Start the server
log.Println("Starting StreamableHTTP server on :8000")
httpServer := server.NewStreamableHTTPServer(s)
if err := httpServer.Start("127.0.0.1:8000"); err != nil {
fmt.Printf("Server error: %s\n", err)
}
}
ShMCPClient
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/joho/godotenv"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/mcp"
"github.com/openai/openai-go"
"github.com/openai/openai-go/option"
"github.com/openai/openai-go/shared"
)
// StreamableMCP client 结构体封装streamable-HTTP客户端功能
type StreamableMCP struct {
client *client.Client
serverInfo *mcp.InitializeResult
}
// NewStreamableMCP 创建新的streamable-HTTP客户端实例
func NewStreamableMCPServer(ctx context.Context, serverURL string, options ...transport.StreamableHTTPCOption) (*StreamableMCP, error) {
// 创建streamable-HTTP传输客户端
mcpClient, err := client.NewStreamableHttpClient(serverURL, options...)
if err != nil {
return nil, fmt.Errorf("创建streamable HTTP客户端失败: %w", err)
}
// 启动客户端
if err := mcpClient.Start(ctx); err != nil {
return nil, fmt.Errorf("启动客户端失败: %w", err)
}
// 设置通知处理器
mcpClient.OnNotification(func(notification mcp.JSONRPCNotification) {
log.Printf("收到服务器通知 - 方法: %s, 参数: %s",
notification.Method, notification.Params)
})
// 初始化连接
initRequest := mcp.InitializeRequest{
Params: mcp.InitializeParams{
ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
ClientInfo: mcp.Implementation{
Name: "StreamableMCPClientDemo",
Version: "1.0.0",
},
Capabilities: mcp.ClientCapabilities{},
},
}
serverInfo, err := mcpClient.Initialize(ctx, initRequest)
if err != nil {
return nil, fmt.Errorf("初始化失败: %w", err)
}
return &StreamableMCP{
client: mcpClient,
serverInfo: serverInfo,
}, nil
}
func Convert2Tool(jsonStr string) ([]openai.ChatCompletionToolParam, error) {
var data map[string][]struct {
Annotations struct {
ReadOnlyHint bool `json:"readOnlyHint"`
DestructiveHint bool `json:"destructiveHint"`
IdempotentHint bool `json:"idempotentHint"`
OpenWorldHint bool `json:"openWorldHint"`
} `json:"annotations"`
Description string `json:"description"`
InputSchema struct {
Properties map[string]interface{} `json:"properties"`
Required []string `json:"required"`
Type string `json:"type"`
} `json:"inputSchema"`
Name string `json:"name"`
}
err := json.Unmarshal([]byte(jsonStr), &data)
if err != nil {
log.Println("Error unmarshaling JSON:", err)
return nil, err
}
var tools []openai.ChatCompletionToolParam
for _, toolData := range data["tools"] {
parameters := shared.FunctionParameters{}
for key, value := range toolData.InputSchema.Properties {
parameters[key] = value
}
parameters["required"] = toolData.InputSchema.Required
parameters["type"] = toolData.InputSchema.Type
tool := openai.ChatCompletionToolParam{
Function: shared.FunctionDefinitionParam{
Name: toolData.Name,
Description: openai.String(toolData.Description),
Parameters: parameters,
},
}
tools = append(tools, tool)
}
// fmt.Printf("Converted tools: %s\n", tools)
return tools, err
}
func processQuery(c *client.Client, ctx context.Context, query string, toolsJsonStr string) (string, error) {
key := os.Getenv("OPENAI_API_KEY")
modelName := os.Getenv("OPENAI_MODEL_NAME")
baseURL := os.Getenv("OPENAI_BASE_URL")
envInfo := fmt.Sprintf("[EnvInfo baseURL: %v modelName:%v]", baseURL,modelName)
log.Printf(envInfo)
client := openai.NewClient(
option.WithAPIKey(key), // defaults to os.LookupEnv("OPENAI_API_KEY")
option.WithBaseURL(baseURL),
)
availableTools, _ := Convert2Tool(toolsJsonStr)
messages := []openai.ChatCompletionMessageParamUnion{
openai.UserMessage(query),
}
params := openai.ChatCompletionNewParams{
Messages: messages,
Model: modelName,
Tools: availableTools,
}
log.Println(params)
completion, err := client.Chat.Completions.New(ctx, params)
if err != nil {
log.Printf("Error processing query: %v", err)
return "", err
}
finalText := []string{}
if completion.Choices[0].Message.Content != "" {
finalResult := completion.Choices[0].Message.Content
finalText = append(finalText, finalResult)
log.Printf("AI: %s", finalResult)
}
content := completion.Choices[0]
if content.FinishReason == "tool_calls" {
toolCall := content.Message.ToolCalls[0]
toolName := toolCall.Function.Name
var toolArgs map[string]interface{}
err := json.Unmarshal([]byte(toolCall.Function.Arguments), &toolArgs)
if err != nil {
log.Printf("Error unmarshaling tool arguments: %v", err)
return "", err
}
result := CallTool(c, ctx, toolName, toolArgs)
callInfoStr := fmt.Sprintf("[Calling tool %s with args %v]", toolName, toolArgs)
log.Printf(callInfoStr)
finalText = append(finalText, callInfoStr)
messages = append(messages, content.Message.ToParam())
if textContent, ok := result.Content[0].(mcp.TextContent); ok {
messages = append(messages, openai.ToolMessage(textContent.Text, toolCall.ID))
}
params.Messages = messages
completion, err = client.Chat.Completions.New(ctx, params)
if err != nil {
log.Printf("Error processing query after tool call: %v", err)
return "", err
}
finalResult := completion.Choices[0].Message.Content
finalText = append(finalText, finalResult)
}
return strings.Join(finalText, "\n"), nil
}
func CallTool(c *client.Client, ctx context.Context, toolName string, toolArgs map[string]interface{}) *mcp.CallToolResult {
result, err := c.CallTool(ctx, mcp.CallToolRequest{Params: mcp.CallToolParams{
Name: toolName,
Arguments: toolArgs,
}})
if err != nil {
log.Fatal(err)
}
log.Printf("Tool result: %s\n", result)
return result
}
// // 健康检查与连接管理
func (s *StreamableMCP) Ping(ctx context.Context) error {
return s.client.Ping(ctx)
}
func (s *StreamableMCP) Close() error {
return s.client.Close()
}
func ChatLoop(c *client.Client, ctx context.Context, toolsJsonStr string) {
log.Println("\nMCP Client Started!")
log.Println("Type your queries or 'quit' to exit.")
for {
fmt.Print("\nQuery: ")
var query string
fmt.Scanln(&query)
query = strings.TrimSpace(query)
if strings.ToLower(query) == "quit" {
break
}
response, err := processQuery(c, ctx, query, toolsJsonStr)
if err != nil {
log.Printf("\nchat_loop Error: %v", err)
} else {
log.Println("\n" + response)
}
}
}
func main() {
// 加载 .env 文件
err := godotenv.Load()
if err != nil {
log.Println("Error loading .env file:", err)
}
mcp_server_url := "127.0.0.1:8000/mcp"
mcp_client_timeout := 300
// 创建上下文
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(mcp_client_timeout)*time.Second)
defer cancel()
// 创建客户端选项 - 启用持续监听以接收服务器通知
options := []transport.StreamableHTTPCOption{
transport.WithContinuousListening(),
transport.WithHTTPTimeout(time.Duration(mcp_client_timeout) * time.Second),
}
// 初始化客户端
mcpClient, err := NewStreamableMCPServer(ctx, mcp_server_url, options...)
if err != nil {
log.Fatalf("客户端初始化失败: %v", err)
}
defer mcpClient.Close()
// 打印服务器信息
log.Printf("已连接到MCP服务器:\n")
log.Printf(" 名称: %s\n", mcpClient.serverInfo.ServerInfo.Name)
log.Printf(" 版本: %s\n", mcpClient.serverInfo.ServerInfo.Version)
log.Printf(" 协议版本: %s\n", mcpClient.serverInfo.ProtocolVersion)
if mcpClient.serverInfo.Capabilities.Tools == nil {
log.Println("no available tools")
return
}
toolsRequest := mcp.ListToolsRequest{}
toolsResult, err := mcpClient.client.ListTools(ctx, toolsRequest)
if err != nil {
log.Printf("Failed to list tools: %v", err)
} else {
log.Printf("Server has %d tools available\n", len(toolsResult.Tools))
for i, tool := range toolsResult.Tools {
log.Printf(" %d. %s - %s\n", i+1, tool.Name, tool.Description)
}
}
jsonData, err := json.Marshal(toolsResult)
if err != nil {
fmt.Println("JSON 编码错误:", err)
return
}
// log.Println(string(jsonData))
// 输出 JSON 字符串
// processQuery(mcpClient.client, ctx, "你是谁?", string(jsonData))
ChatLoop(mcpClient.client, ctx, string(jsonData))
}
这里使用的是[mcp-go]。官方推荐的是[go-sdk],大体思路差不多,业内还有字节开源的[eino]。不过相对于官方[PythonSDK]而言,没有提供stateless_http+json_response两个参数,实现4种不同AI业务场景支持。不过通过看源码官方的go-sdk有提供Last-Event-ID机制,mcp-go有提供一个stateLess参数。但是大体而言MCPServer开发还是推荐使用[PythonSDK]。
总结
0,因为所有语言的SDK均参考官方规划文档编写,从源码来看,大体思路和使用方式都差不多,不过均以PythonSDK为准。
1,上文使用mcp-go和openai官方提供的openai-go实现简单的LLM和MCP StreamableHTTP交互的一个demo,需要LLM大模型支持FunctionCall。
2,官方推荐的是go-sdk,大体思路差不多,业内还有字节开源的eino。
3,相对于官方PythonSDK而言,没有提供stateless_http+json_response两个参数,不过通过看源码官方的go-sdk有提供Last-Event-ID机制,mcp-go有提供一个stateLess参数。
4,为了保证使用到最新的版本和支持最新的协议,以及生态支持上,推荐还是使用PythonSDK。