searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

GO 调用链使用

2025-10-09 10:05:46
5
0

Go语言中的调用链实现

1. 核心概念

  • Span: 代表调用链中的一个操作单元

  • Trace: 一次完整的请求跟踪,包含多个Span

  • Context: 在服务间传递跟踪信息

2. 常用库介绍

OpenTelemetry

OpenTelemetry是目前最主流的调用链解决方案:

package main

import (
    "context"
    "fmt"
    "log"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "go.opentelemetry.io/otel/trace"
)

// 初始化Tracer Provider
func initTracer(url string) (*sdktrace.TracerProvider, error) {
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    if err != nil {
        return nil, err
    }
    
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exp),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("user-service"),
            attribute.String("environment", "production"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    return tp, nil
}

// 业务函数示例
func getUserInfo(ctx context.Context, userID string) (string, error) {
    tracer := otel.Tracer("user-service")
    
    // 创建新的span
    ctx, span := tracer.Start(ctx, "getUserInfo", 
        trace.WithAttributes(attribute.String("user.id", userID)))
    defer span.End()
    
    // 模拟业务逻辑
    if userID == "123" {
        span.SetAttributes(attribute.Bool("user.found", true))
        return "John Doe", nil
    }
    
    span.SetAttributes(attribute.Bool("user.found", false))
    return "", fmt.Errorf("user not found")
}

// HTTP处理函数
func userHandler(w http.ResponseWriter, r *http.Request) {
    tracer := otel.Tracer("user-service")
    
    ctx, span := tracer.Start(r.Context(), "userHandler")
    defer span.End()
    
    userID := r.URL.Query().Get("id")
    userName, err := getUserInfo(ctx, userID)
    
    if err != nil {
        span.RecordError(err)
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    fmt.Fprintf(w, "User: %s", userName)
}

自定义简易调用链实现

对于小型项目,可以基于context实现简单的调用链:

package trace

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
)

type Span struct {
    TraceID    string                 `json:"trace_id"`
    SpanID     string                 `json:"span_id"`
    ParentID   string                 `json:"parent_id,omitempty"`
    Name       string                 `json:"name"`
    StartTime  time.Time              `json:"start_time"`
    EndTime    time.Time              `json:"end_time,omitempty"`
    Attributes map[string]interface{} `json:"attributes,omitempty"`
    Error      error                  `json:"error,omitempty"`
}

type Trace struct {
    Spans []*Span `json:"spans"`
}

type traceKey struct{}

// 开始新的Span
func StartSpan(ctx context.Context, name string) (context.Context, *Span) {
    span := &Span{
        SpanID:     generateID(),
        Name:       name,
        StartTime:  time.Now(),
        Attributes: make(map[string]interface{}),
    }
    
    // 从父context获取trace信息
    if parentSpan, ok := ctx.Value(traceKey{}).(*Span); ok {
        span.TraceID = parentSpan.TraceID
        span.ParentID = parentSpan.SpanID
    } else {
        span.TraceID = generateID()
    }
    
    // 设置属性
    span.SetAttribute("service.name", "user-service")
    
    return context.WithValue(ctx, traceKey{}, span), span
}

// 结束Span
func (s *Span) End() {
    s.EndTime = time.Now()
}

// 设置属性
func (s *Span) SetAttribute(key string, value interface{}) {
    s.Attributes[key] = value
}

// 记录错误
func (s *Span) RecordError(err error) {
    s.Error = err
    s.SetAttribute("error", true)
    s.SetAttribute("error.message", err.Error())
}

// 从context获取当前Span
func SpanFromContext(ctx context.Context) *Span {
    if span, ok := ctx.Value(traceKey{}).(*Span); ok {
        return span
    }
    return nil
}

3. 中间件集成

HTTP服务器中间件

func TracingMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        tracer := otel.Tracer("http-server")
        
        // 从header中提取trace context
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), 
            propagation.HeaderCarrier(r.Header))
        
        // 开始新的span
        ctx, span := tracer.Start(ctx, "http.request",
            trace.WithAttributes(
                attribute.String("http.method", r.Method),
                attribute.String("http.url", r.URL.String()),
                attribute.String("http.user_agent", r.UserAgent()),
            ))
        defer span.End()
        
        // 记录响应状态码
        rw := &responseWriter{ResponseWriter: w, statusCode: 200}
        
        // 处理请求
        next.ServeHTTP(rw, r.WithContext(ctx))
        
        span.SetAttributes(attribute.Int("http.status_code", rw.statusCode))
    })
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

数据库调用追踪

type tracedDB struct {
    db     *sql.DB
    tracer trace.Tracer
}

func (t *tracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    ctx, span := t.tracer.Start(ctx, "db.query",
        trace.WithAttributes(
            attribute.String("db.statement", query),
            attribute.String("db.system", "mysql"),
        ))
    defer span.End()
    
    start := time.Now()
    rows, err := t.db.QueryContext(ctx, query, args...)
    duration := time.Since(start)
    
    span.SetAttributes(attribute.String("db.duration", duration.String()))
    if err != nil {
        span.RecordError(err)
    }
    
    return rows, err
}

4. 配置和部署

Docker Compose部署Jaeger

version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.35
    ports:
      - "16686:16686"  # UI
      - "14268:14268"  # Collector
    environment:
      - COLLECTOR_OTLP_ENABLED=true

主程序配置

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "go.opentelemetry.io/otel"
)

func main() {
    // 初始化调用链
    tp, err := initTracer("http://localhost:14268/api/traces")
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }()
    
    // 设置路由
    mux := http.NewServeMux()
    mux.HandleFunc("/user", userHandler)
    
    // 添加中间件
    handler := TracingMiddleware(mux)
    
    server := &http.Server{
        Addr:    ":8080",
        Handler: handler,
    }
    
    // 启动服务器
    go func() {
        log.Println("Starting server on :8080")
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Server error: %v", err)
        }
    }()
    
    // 优雅关闭
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := server.Shutdown(ctx); err != nil {
        log.Fatalf("Server shutdown error: %v", err)
    }
}

 

0条评论
作者已关闭评论
c****k
8文章数
0粉丝数
c****k
8 文章 | 0 粉丝
原创

GO 调用链使用

2025-10-09 10:05:46
5
0

Go语言中的调用链实现

1. 核心概念

  • Span: 代表调用链中的一个操作单元

  • Trace: 一次完整的请求跟踪,包含多个Span

  • Context: 在服务间传递跟踪信息

2. 常用库介绍

OpenTelemetry

OpenTelemetry是目前最主流的调用链解决方案:

package main

import (
    "context"
    "fmt"
    "log"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "go.opentelemetry.io/otel/trace"
)

// 初始化Tracer Provider
func initTracer(url string) (*sdktrace.TracerProvider, error) {
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    if err != nil {
        return nil, err
    }
    
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exp),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("user-service"),
            attribute.String("environment", "production"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    return tp, nil
}

// 业务函数示例
func getUserInfo(ctx context.Context, userID string) (string, error) {
    tracer := otel.Tracer("user-service")
    
    // 创建新的span
    ctx, span := tracer.Start(ctx, "getUserInfo", 
        trace.WithAttributes(attribute.String("user.id", userID)))
    defer span.End()
    
    // 模拟业务逻辑
    if userID == "123" {
        span.SetAttributes(attribute.Bool("user.found", true))
        return "John Doe", nil
    }
    
    span.SetAttributes(attribute.Bool("user.found", false))
    return "", fmt.Errorf("user not found")
}

// HTTP处理函数
func userHandler(w http.ResponseWriter, r *http.Request) {
    tracer := otel.Tracer("user-service")
    
    ctx, span := tracer.Start(r.Context(), "userHandler")
    defer span.End()
    
    userID := r.URL.Query().Get("id")
    userName, err := getUserInfo(ctx, userID)
    
    if err != nil {
        span.RecordError(err)
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    fmt.Fprintf(w, "User: %s", userName)
}

自定义简易调用链实现

对于小型项目,可以基于context实现简单的调用链:

package trace

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
)

type Span struct {
    TraceID    string                 `json:"trace_id"`
    SpanID     string                 `json:"span_id"`
    ParentID   string                 `json:"parent_id,omitempty"`
    Name       string                 `json:"name"`
    StartTime  time.Time              `json:"start_time"`
    EndTime    time.Time              `json:"end_time,omitempty"`
    Attributes map[string]interface{} `json:"attributes,omitempty"`
    Error      error                  `json:"error,omitempty"`
}

type Trace struct {
    Spans []*Span `json:"spans"`
}

type traceKey struct{}

// 开始新的Span
func StartSpan(ctx context.Context, name string) (context.Context, *Span) {
    span := &Span{
        SpanID:     generateID(),
        Name:       name,
        StartTime:  time.Now(),
        Attributes: make(map[string]interface{}),
    }
    
    // 从父context获取trace信息
    if parentSpan, ok := ctx.Value(traceKey{}).(*Span); ok {
        span.TraceID = parentSpan.TraceID
        span.ParentID = parentSpan.SpanID
    } else {
        span.TraceID = generateID()
    }
    
    // 设置属性
    span.SetAttribute("service.name", "user-service")
    
    return context.WithValue(ctx, traceKey{}, span), span
}

// 结束Span
func (s *Span) End() {
    s.EndTime = time.Now()
}

// 设置属性
func (s *Span) SetAttribute(key string, value interface{}) {
    s.Attributes[key] = value
}

// 记录错误
func (s *Span) RecordError(err error) {
    s.Error = err
    s.SetAttribute("error", true)
    s.SetAttribute("error.message", err.Error())
}

// 从context获取当前Span
func SpanFromContext(ctx context.Context) *Span {
    if span, ok := ctx.Value(traceKey{}).(*Span); ok {
        return span
    }
    return nil
}

3. 中间件集成

HTTP服务器中间件

func TracingMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        tracer := otel.Tracer("http-server")
        
        // 从header中提取trace context
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), 
            propagation.HeaderCarrier(r.Header))
        
        // 开始新的span
        ctx, span := tracer.Start(ctx, "http.request",
            trace.WithAttributes(
                attribute.String("http.method", r.Method),
                attribute.String("http.url", r.URL.String()),
                attribute.String("http.user_agent", r.UserAgent()),
            ))
        defer span.End()
        
        // 记录响应状态码
        rw := &responseWriter{ResponseWriter: w, statusCode: 200}
        
        // 处理请求
        next.ServeHTTP(rw, r.WithContext(ctx))
        
        span.SetAttributes(attribute.Int("http.status_code", rw.statusCode))
    })
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

数据库调用追踪

type tracedDB struct {
    db     *sql.DB
    tracer trace.Tracer
}

func (t *tracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    ctx, span := t.tracer.Start(ctx, "db.query",
        trace.WithAttributes(
            attribute.String("db.statement", query),
            attribute.String("db.system", "mysql"),
        ))
    defer span.End()
    
    start := time.Now()
    rows, err := t.db.QueryContext(ctx, query, args...)
    duration := time.Since(start)
    
    span.SetAttributes(attribute.String("db.duration", duration.String()))
    if err != nil {
        span.RecordError(err)
    }
    
    return rows, err
}

4. 配置和部署

Docker Compose部署Jaeger

version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.35
    ports:
      - "16686:16686"  # UI
      - "14268:14268"  # Collector
    environment:
      - COLLECTOR_OTLP_ENABLED=true

主程序配置

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "go.opentelemetry.io/otel"
)

func main() {
    // 初始化调用链
    tp, err := initTracer("http://localhost:14268/api/traces")
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }()
    
    // 设置路由
    mux := http.NewServeMux()
    mux.HandleFunc("/user", userHandler)
    
    // 添加中间件
    handler := TracingMiddleware(mux)
    
    server := &http.Server{
        Addr:    ":8080",
        Handler: handler,
    }
    
    // 启动服务器
    go func() {
        log.Println("Starting server on :8080")
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Server error: %v", err)
        }
    }()
    
    // 优雅关闭
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := server.Shutdown(ctx); err != nil {
        log.Fatalf("Server shutdown error: %v", err)
    }
}

 

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0