Go语言中的调用链实现
1. 核心概念
-
Span: 代表调用链中的一个操作单元
-
Trace: 一次完整的请求跟踪,包含多个Span
-
Context: 在服务间传递跟踪信息
2. 常用库介绍
OpenTelemetry
OpenTelemetry是目前最主流的调用链解决方案:
package main
import (
"context"
"fmt"
"log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)
// 初始化Tracer Provider
func initTracer(url string) (*sdktrace.TracerProvider, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("user-service"),
attribute.String("environment", "production"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
// 业务函数示例
func getUserInfo(ctx context.Context, userID string) (string, error) {
tracer := otel.Tracer("user-service")
// 创建新的span
ctx, span := tracer.Start(ctx, "getUserInfo",
trace.WithAttributes(attribute.String("user.id", userID)))
defer span.End()
// 模拟业务逻辑
if userID == "123" {
span.SetAttributes(attribute.Bool("user.found", true))
return "John Doe", nil
}
span.SetAttributes(attribute.Bool("user.found", false))
return "", fmt.Errorf("user not found")
}
// HTTP处理函数
func userHandler(w http.ResponseWriter, r *http.Request) {
tracer := otel.Tracer("user-service")
ctx, span := tracer.Start(r.Context(), "userHandler")
defer span.End()
userID := r.URL.Query().Get("id")
userName, err := getUserInfo(ctx, userID)
if err != nil {
span.RecordError(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
fmt.Fprintf(w, "User: %s", userName)
}
自定义简易调用链实现
对于小型项目,可以基于context实现简单的调用链:
package trace
import (
"context"
"encoding/json"
"fmt"
"time"
)
type Span struct {
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
ParentID string `json:"parent_id,omitempty"`
Name string `json:"name"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
Error error `json:"error,omitempty"`
}
type Trace struct {
Spans []*Span `json:"spans"`
}
type traceKey struct{}
// 开始新的Span
func StartSpan(ctx context.Context, name string) (context.Context, *Span) {
span := &Span{
SpanID: generateID(),
Name: name,
StartTime: time.Now(),
Attributes: make(map[string]interface{}),
}
// 从父context获取trace信息
if parentSpan, ok := ctx.Value(traceKey{}).(*Span); ok {
span.TraceID = parentSpan.TraceID
span.ParentID = parentSpan.SpanID
} else {
span.TraceID = generateID()
}
// 设置属性
span.SetAttribute("service.name", "user-service")
return context.WithValue(ctx, traceKey{}, span), span
}
// 结束Span
func (s *Span) End() {
s.EndTime = time.Now()
}
// 设置属性
func (s *Span) SetAttribute(key string, value interface{}) {
s.Attributes[key] = value
}
// 记录错误
func (s *Span) RecordError(err error) {
s.Error = err
s.SetAttribute("error", true)
s.SetAttribute("error.message", err.Error())
}
// 从context获取当前Span
func SpanFromContext(ctx context.Context) *Span {
if span, ok := ctx.Value(traceKey{}).(*Span); ok {
return span
}
return nil
}
3. 中间件集成
HTTP服务器中间件
func TracingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tracer := otel.Tracer("http-server")
// 从header中提取trace context
ctx := otel.GetTextMapPropagator().Extract(r.Context(),
propagation.HeaderCarrier(r.Header))
// 开始新的span
ctx, span := tracer.Start(ctx, "http.request",
trace.WithAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.user_agent", r.UserAgent()),
))
defer span.End()
// 记录响应状态码
rw := &responseWriter{ResponseWriter: w, statusCode: 200}
// 处理请求
next.ServeHTTP(rw, r.WithContext(ctx))
span.SetAttributes(attribute.Int("http.status_code", rw.statusCode))
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
数据库调用追踪
type tracedDB struct {
db *sql.DB
tracer trace.Tracer
}
func (t *tracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
ctx, span := t.tracer.Start(ctx, "db.query",
trace.WithAttributes(
attribute.String("db.statement", query),
attribute.String("db.system", "mysql"),
))
defer span.End()
start := time.Now()
rows, err := t.db.QueryContext(ctx, query, args...)
duration := time.Since(start)
span.SetAttributes(attribute.String("db.duration", duration.String()))
if err != nil {
span.RecordError(err)
}
return rows, err
}
4. 配置和部署
Docker Compose部署Jaeger
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:1.35
ports:
- "16686:16686" # UI
- "14268:14268" # Collector
environment:
- COLLECTOR_OTLP_ENABLED=true
主程序配置
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.opentelemetry.io/otel"
)
func main() {
// 初始化调用链
tp, err := initTracer("http://localhost:14268/api/traces")
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// 设置路由
mux := http.NewServeMux()
mux.HandleFunc("/user", userHandler)
// 添加中间件
handler := TracingMiddleware(mux)
server := &http.Server{
Addr: ":8080",
Handler: handler,
}
// 启动服务器
go func() {
log.Println("Starting server on :8080")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
// 优雅关闭
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Fatalf("Server shutdown error: %v", err)
}
}