searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

opentelemetry简介及二次开发实操

2024-04-29 02:33:12
122
0
## 简介
OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题。所观测的数据类型目前就包含trace、metrics、logs
 
![null](otel工作范围.png)
如上图所示,涵盖了opentelemetry要做的几件事情,统一格式的数据模型(包含监控、日志和链路),标准的采集器,标准的处理器,标准的导出器。
 
 
 
## 怎么用
 
opentelemetry-collector是一个实现了上面四种标准化问题的可执行文件。里面包含了官方提供的采集器,处理器和导出器。各个厂商也对其进行扩展,扩展出适应自身产品的采集器与导出器。
 
![null](otel-整体架构.png)
 
它运行起来可以是按上面的模式。他既可以作为Agent运行在应用的Host上,统一收集应用的数据上报到服务端;也可以作为Server在服务端接收数据,然后持久化。
 
由于在OpenTelemetry中数据模型是标准化的,而且实现这它的厂商也比较多,因此它也可以作为一种适配器而存在,来对两个不同厂商的数据进行转换。如以skywalking格式接收链路数据,再以实现opentracing标准的Jaeger输出数据。
 
下面则通过两个具体例子介绍collector如何配置使用
 
### 监控
 
监控业界最常用的开源组件则是prometheus。在某些监控方案中,会通过prometheus做指标采集,然后持久化到clickhouse中。下面例子则以prometheus作为receiver,logging作为exporter,当采集指标成功则会在日志输出。
 
配置如下
 
```
#config.yaml
 
    receivers:
      prometheus:
        config:
          scrape_configs:
          - job_name: kubernetes-nodes-cadvisor
            honor_timestamps: true
            scrape_interval: 15s
            scrape_timeout: 10s
            metrics_path: /metrics
            scheme: https
            authorization:
              type: Bearer
              credentials_file: /var/run/secrets/kubernetes.io/serviceaccount/token
            tls_config:
              ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
              insecure_skip_verify: true
            follow_redirects: true
            relabel_configs:
            - separator: ;
              regex: __meta_kubernetes_node_label_(.+)
              replacement: $$1
              action: labelmap
            - separator: ;
              regex: (.*)
              target_label: __address__
              replacement: kubernetes.default.svc:443
              action: replace
            - source_labels: [__meta_kubernetes_node_name]
              separator: ;
              regex: (.+)
              target_label: __metrics_path__
              replacement: /api/v1/nodes/$$1/proxy/metrics/cadvisor
              action: replace
            kubernetes_sd_configs:
            - role: node
              follow_redirects: true
    processors:
    exporters:
      logging:
    service:
      pipelines:
        metrics:
          receivers: [prometheus]
          processors: []
          exporters: [logging]
```
一般collector的配置都会包含receivers,receivers,exporters和service几个模块。
 
前三个就是上面提到的接收器,处理器和导出器。当前collecotr要用到都在对应位置添加上其配置,不同组件的配置和行为都不一样。像prometheus的只需要填上其采集配置则可,他就会跟prometheus一样向各个应用中刮取指标。像其他的receiver可能是暴露一个端口监听,接收其他应用请求上报过来的数据。
>>ps: 在porometheus原本的一些美元符号`$`,放到collector处会有转义,因此要保证其按照`$`原样传递给receiver,要将`$`写成`$$`。如上面的`$1`写成`$$1`
 
serivce下面包含了pipelines,pipelines下就是包含了三种数据(监控日志追踪)所使用组件的组合,如上面例子则是监控数据,则用metrics,它的receiver是prometheus,无需使用process,exporter是logging。
 
最终通过命令运行
```
./otelcol  --config=config.yaml
```
或者通过容器方式运行。
 
>>ps: 以容器方式运行的时候,注意pod的sa所具备的权限要与prometheus一致,否则会有权限问题
 
 
运行后能看到采集指标成功的日志
```
2022-10-23T17:53:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
2022-10-23T17:54:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
2022-10-23T17:55:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
```
 
### 链路追踪
 
链路追踪的例子则是从istio中收集链路数据,通过otlp上传到上游的collector,同时同样以日志输出。
 
顺带提一下应用相关的,istio的应用是经典demo  bookinfo,由网格暴露出来的链路数据则是opencensus协议的,网格中的应用在发生互调时需要透传X-B3的头部和request-id,网格的配置中需要加入下面的内容
```
  meshConfig:
    defaultConfig:
      tracing:
        openCensusAgent:
          address: {collector’s svc name or ip}:55678
          context:
          - W3C_TRACE_CONTEXT
          - B3
        sampling: 100
    enableTracing: true
```
加完配置后记得需要把相关网格应用重启下,配置才会生效
 
 
collector的配置如下
```
    receivers:
      opencensus:
    processors:
    exporters:
      logging:
      otlp:
        endpoint: "172.16.142.33:9817"
        insecure: true  ## 18版本这样设置可以
        tls:
          insecure: true   ##在61版本只能这样设置
    service:
      pipelines:
        traces:
          receivers: [opencensus]
          processors: []
          exporters: [logging,otlp]
```
receivers,processors,exporters和service这几个组件跟监控的类似,必不可少。由于是链路追踪,service的pipline就是traces,同理如果遇到日志场景,则是logs了。traces中与监控的类似,把用到的组件集合都填写上去。
 
运行起来后,这个opencensus的receiver有别与prometheus。大多数receiver与opencensus类似,是被动接收;而prometheus是主动去拉。程序运行起来会看见opencensus receiver运行起来的日志,但是并不会显示它监听的端口,上面的55678是默认的
```
2022-10-26T00:33:45.406Z info pipelines/pipelines.go:102 Receiver is starting... {"kind": "receiver", "name": "opencensus", "pipeline": "traces"}
2022-10-26T00:33:45.407Z info pipelines/pipelines.go:106 Receiver started. {"kind": "receiver", "name": "opencensus", "pipeline": "traces"}
```
>>如需修改端口,在opencensus中加入grpc的配置,在endpoint字段通过{ip}:{port}格式指定
 
当请求bookinfo的productpage后,会看到logging exporter输出的日志
```
2021-01-22T13:26:49.320Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 2}
2021-01-22T13:26:49.430Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 1}
2021-01-22T13:26:50.422Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 4}
2021-01-22T13:26:50.571Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 1}
```
 
## 扩展
 
目前用collector收集数据后上报至上游collector,而上游的collector前面有增加了一个网关,用作鉴权之用,上报数据需要带上license。因此现有的exporter已经不够用了,需要自己扩展,不过网关方面有提供SDK,上报数据仅需要给网关的地址,license,调用SDK则可。
 
### 环境准备
 
按照官网的说法,扩展任意一个组件,其实都要完整构建一次collector。collector是用一个ocb的脚手架来生成代码以及构件可执行文件
 
到github中下载合适自己操作系统和CPU架构的版本`github.com/open-telemetry/opentelemetry-collector/releases`
 
下载完毕将其重命名并赋予可执行权限
```
mv ocb_0.53.0_darwin_amd64 ocb
chmod 777 ocb
```
 
创建一个 builder manifest文件,这个文件就描述了当前这个自定义的collector名称,版本,描述,输出路径,以及包含哪些receiver,processor,exporter组件,样例如下
```
builder-config.yaml
dist:
  name: otelcol-dev  ##collector的名称
  description: "Basic OTel Collector distribution for Developers" ##描述信息
  output_path: ./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder ##生成的代码输出路径
  otelcol_version: "0.61.0"  ##collector是基于主干的哪个版本
  go: "/usr/local/go/bin/go"
 
## 下面这些就是当前collector包含的组件,其实是通过gomod把依赖添加进来
 
exporters:
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter
      v0.61.0"
  - import: go.opentelemetry.io/collector/exporter/loggingexporter
    gomod: go.opentelemetry.io/collector v0.61.0
  - import: go.opentelemetry.io/collector/exporter/otlpexporter
    gomod: go.opentelemetry.io/collector v0.61.0
 
receivers:
  - import: go.opentelemetry.io/collector/receiver/otlpreceiver
    gomod: go.opentelemetry.io/collector v0.61.0
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver
      v0.61.0"
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver
      v0.61.0"
 
processors:
  - import: go.opentelemetry.io/collector/processor/batchprocessor
    gomod: go.opentelemetry.io/collector v0.61.0
 
```
上面的列表就表明当前collector只包含这些组件,如果实际应用时要用其他的组件,运行的时候就会报错,只需要把组件的gomod添加进来则可,第三方的可以在github上opentelemetry-collector-contrib这个项目下找,里面放的都是第三方扩展出来的组件。 
 
执行下面命令会按照上面配置生成代码,下载依赖,构件二进制文件
```
./ocb --config builder-config.yaml
2022-10-26T10:56:57.951+0800 INFO internal/command.go:121 OpenTelemetry Collector Builder {"version": "0.61.0", "date": "2022-09-28T20:12:59Z"}
2022-10-26T10:56:57.954+0800 INFO internal/command.go:154 Using config file {"path": "builder-config.yaml"}
2022-10-26T10:56:58.111+0800 INFO builder/config.go:103 Using go {"go-executable": "/usr/local/go/bin/go"}
2022-10-26T10:56:58.112+0800 INFO builder/main.go:76 Sources created{"path": "./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder"}
2022-10-26T10:56:58.394+0800 INFO builder/main.go:118 Getting go modules
2022-10-26T10:56:58.492+0800 INFO builder/main.go:87 Compiling
2022-10-26T10:57:04.101+0800 INFO builder/main.go:99 Compiled {"binary": "./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder/otelcol-dev"}
```
项目包依赖方式默认是使用gomod模式,当然也可以用回go path。但是ocb执行的时候虽然下好了依赖包,还是会提示需要执行go mod vendor把依赖添加好。再次执行即可构建成功。
 
生成好的collector配上前文介绍的collector的配置即可运行
 
```
./otelcol-dev --conifg=config.yaml
```
 
进入生成好的代码,会包含下列内容
```
|--compoents.go
|--main.go
|--main_others.go
|--main_windows.go
```
这些代码都是由脚手架生成的,不要手动修改,否则再次执行ocb构件时就会被抹掉。进入compoents.go,会看到各个组件的factory,每个组件都是传入其factory,最终collector调用factory构造出组件的实例
```
func components() (component.Factories, error) {
var err error
factories := component.Factories{}
 
factories.Extensions, err = component.MakeExtensionFactoryMap(
)
if err != nil {
return component.Factories{}, err
}
 
factories.Receivers, err = component.MakeReceiverFactoryMap(
otlpreceiver.NewFactory(),
opencensusreceiver.NewFactory(),
prometheusreceiver.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
factories.Exporters, err = component.MakeExporterFactoryMap(
jaegerexporter.NewFactory(),
loggingexporter.NewFactory(),
otlpexporter.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
factories.Processors, err = component.MakeProcessorFactoryMap(
batchprocessor.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
return factories, nil
}
```
 
### 扩展自定义的exporter
 
在项目目录下创建一个目录 ctgarmsexporter,用来存放exporter的代码,目录中创建以下几个文件
```
ctgarmsexporter
|--cofig.go   ##包含这个组件配置的结构体
|--ctgarms-exporter.go  ##包含组件的实际逻辑
|--factory.go    ## 组件的factory,供传递给上面的介绍的components,用于构造组件
```
 
config.go的定义形如下面所示
```
type Config struct {
config.ExporterSettings `mapstructure:",squash"`
ArmsGateway             string `mapstructure:"arms_gateway"`
AppId                   string `mapstructure:"app_id"`
License                 string `mapstructure:"license"`
 
}
 
// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
 
 
return nil
}
 
```
结构的第一个成员是必选,同理receiver是ReceiverSetting,Validate()方法是继承了某些结构时所需的
 
再看factory.go
```
func NewFactory() component.ExporterFactory {
return component.NewExporterFactory(
typeStr,
createDefaultConfig,
component.WithMetricsExporter(createCtgArmsMetricExporter, stability),
component.WithTracesExporter(createCtgArmsTraceExporter, stability))
}
```
主要调用component.NewExporterFactory函数构造一个ExporterFactory,传参就包含了exporter的名字typeStr,这里是个常量 ctgarms,第二个参数是构造默认配置参数的函数createDefaultConfig,后面两个参数是表明了当前这个Exporter能给指标Metrices,链路追踪Trace还是给日志Log使用。
 
先看看配置参数的函数createDefaultConfig,组件的Config中凡是有默认参数的都在这里设置
```
func createDefaultConfig() config.Exporter {
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
}
}
```
 
后面的参数挑其中一个component.WithMetricsExporter函数来说,createCtgArmsTraceExporter是构造用于对应pipeline的组件实例的构造函数,定义如下,第二个参数是StabilityLevel 稳定性等级,即stable,bate,alpha等等,是一个枚举。
```
func createCtgArmsTraceExporter(_ context.Context, params component.ExporterCreateSettings, baseCfg config.Exporter) (component.TracesExporter, error) {
 
logger := params.Logger
exporterCfg := baseCfg.(*Config)
 
sender := NewCtgArmsSender()
sender.logger = logger
sender.config = exporterCfg
logger.Info(fmt.Sprintf("arms config appid:%s,url:%s,license:%s", exporterCfg.AppId, exporterCfg.ArmsGateway, exporterCfg.License))
err := sender.initSdk()
if err != nil {
return nil, err
}
 
return exporterhelper.NewTracesExporter(context.TODO(),
params,
baseCfg,
sender.pushTraceData)
 
}
```
可有从构造函数的入参后两个入参获取从配置文件中读回来的配置内容,以及日志数据对象,同时还在这里把SDK进行初始化,最终通过一个exporterhelper.NewTracesExporter函数构造出TraceExporter实例。最后一个参数则是执行Export操作时的关键逻辑。在后续要为exporter加入发送队列,重试,超时等机制,可以直接在exporterhelper.NewTracesExporter中补充,如下面代码所示。当然相关配置,默认配置同样也要在Config结构以及createDefaultConfig()函数中补充,可以进入其他组建的源码中获取参考。
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings))
```
 
NewCtgArmsSender的定义在ctgarms-exporter.go中
```
type ctgArmsSender struct {
logger            *zap.Logger
config            *Config
client            *sdk.DataProxyClient
metricsMarshal    pmetric.Marshaler
traceMarshal      ptrace.Marshaler
}
 
 
func (c *ctgArmsSender) pushTraceData(
_ context.Context,
td ptrace.Traces,
) error {
var err error
datas, err := c.traceMarshal.MarshalTraces(td)
if err != nil {
return err
}
err = c.client.TransData(tacesKey, &datas)
return err
}
```
pushTraceData就是把要导出的数据ptrace.Traces调用一下SDK进行发送,由于SDK发送是数据是[]byte,因此这里需要用c.traceMarshal把ptrace.Traces转成二进制数组的形式,ptrace.Traces就是opentelelmetry标准的数据格式,刚好接收方也是opentelemetry,这里不需要作任何格式转换,而上传又通过gRPC,因此这个c.traceMarshal实际是一个实现了ptrace.Marshaler接口的ProtoMarshaler实例,其构造如下所示
```
traceMarshal:      ptrace.NewProtoMarshaler()
```
实现ptrace.Marshaler的对象也有不少,可查看其他组件的代码中寻找。
 
编码完毕后,回去builder-config.yaml中,在exporters中添加上刚刚编写的exporter,如平时添加本地依赖包一样,ocb构建时才会把扩展的exporter包含在这个collector中
```
 
exporters:
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter
      v0.61.0"
  - import: go.opentelemetry.io/collector/exporter/loggingexporter
    gomod: go.opentelemetry.io/collector v0.61.0
  - import: go.opentelemetry.io/collector/cmd/builder/ctgarmsexporter
    gomod: go.opentelemetry.io/collector/cmd/builder v0.0.0
  - import: go.opentelemetry.io/collector/exporter/otlpexporter
    gomod: go.opentelemetry.io/collector v0.61.0
```
 
再执行ocb,就能看到components.go的exporter factory有包含这个自定义exporter进去
```
factories.Exporters, err = component.MakeExporterFactoryMap(
jaegerexporter.NewFactory(),
loggingexporter.NewFactory(),
ctgarmsexporter.NewFactory(),
otlpexporter.NewFactory(),
)
```
构造出来的collector就包含了这个扩展的exporter,可以在配置中添加使用
```
exporters:
  logging:
    logLevel: debug
  ctgarms:
    arms_gateway: 127.0.0.1:8089
    app_id: sjaooawjo
    license: sdfahfelsjfo
service:
  pipelines:
    traces:
          receivers: [opencensus]
          processors: []
          exporters: [ctgarms]
```
>>ps:receiver,process部分配置已省略
 
 
### 扩展的坑与排查
 
上述的扩展主要是参照了阿里的logexporter,在运行过程中遇到了一些报错,按照报错的提示以及参照了其他expoter的代码,逐步往exporter中添砖加瓦,下面就罗列了几个遇到过的问题及处理措施
 
#### 问题1,引入retry
 
问题:因SDK更新token后,导致部分上报的协程未同步更新,上报数据会报token_not_found,此外exporterhelper会提示启用retry
解决:参照了otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.RetrySettings   `mapstructure:"retry_on_failure"`
```
同时factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
RetrySettings:    exporterhelper.NewDefaultRetrySettings(),
```
exporterhelper构造MetricsExporter时也加上retry的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings))
```
这样一次上报失败之后它不会马上报错而丢失数据,会经过重试后才尝试丢失数据
 
当加了上述配置后,可以在配置文件ctgarms的exporter配置处增加retry_on_failure,配置重试相关的属性。不配置按照默认的也可以。
 
#### 问题2,引入send_queue
 
问题:同样由于token_not_found问题,上报数据经过多次重试后,仍会被drop掉,exporterhelper会提示启用send_queue
解决:同样参照otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.QueueSettings   `mapstructure:"sending_queue"`
```
由于加了这个属性,Congfig必须实现Validate()方法,验证queueSettings的配置
```
func (cfg *Config) Validate() error {
if err := cfg.QueueSettings.Validate(); err != nil {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}
 
return nil
}
```
factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
QueueSettings:    exporterhelper.NewDefaultQueueSettings(),
```
exporterhelper构造MetricsExporter时也加上send_queue的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings),
```
 
#### 问题3,上报数据过大,导致网关测报错
 
问题:由于gRPC传输数据默认最大数据量为4M,而cadvisor采的数据量大概6M左右,报错内容是grpc: receiverd message after decompression larger than max (xxxxx vs. 4194304)
解决:对于这个问题没有额外在exporter中修改,因为cadvisor的指标本身数量不多,只是转换成[]byte后数据量巨大,长远来说应该由SDK提供数据分片发送机制,目前临时方案是使用batch processor
```
batch:
  send_batch_size: 10
  send_batch_max_size: 30
```
经查看代码后发现,这size不是上传时的字节数,而是对应数据类别的数据条数,比如指标是指标个数,日志是日志条数。使用processor之后,未报过相似错误
 
 
#### 问题4,报send_queue已经满了,建议调大send_queue的size
 
问题:当SDK报auto reconnect后,没发现上报成功的日志,也没发现发送失败的日志,过一段时间后大量在报send_queue已经满了,建议调大send_queue的size
解决:目测是在调用SDK发送数据时阻塞了,导致后续的数据积压在send_queue,不能单纯调整send_queue的size,尝试在exporter中加入超时,这样发送阻塞的数据要么retry,最后还是不行则drop掉,不应该一直积在send_queue影响后续所有数据。
同样参照otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.TimeoutSettings `mapstructure:",squash"` 
```
同时factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
TimeoutSettings:  exporterhelper.NewDefaultTimeoutSettings(),
```
exporterhelper构造MetricsExporter时也加上timeout的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings),
exporterhelper.WithTimeout(exporterCfg.TimeoutSettings))
```
因后面也同步更新了SDK,添加了这个效果无法验证。后续连跑了几天也没发现这个因发送阻塞而导致塞满send_queue的问题
 
#### 问题小结
虽然上述问题基本上是由于SDK自身问题或不完善导致,但配合SDK修改而调整exporter过程中,exporter自身也在不断完善,不断增强了自身的健壮性。
 
## 参考链接
- blog.csdn.net/DaoCloud_daoke/article/details/122682509   云原生观测性 - OpenTelemetry(前面还可以,后边从代码开始就不好看)
- zhuanlan.zhihu.com/p/361652744  OpenTelemetry 简析(阿里的文章)
- cloud.tencent.com/developer/article/1791748   在Kubernetes上部署OpenTelemetry收集器(实践案例来源)
- cnblogs.com/charlieroro/p/13883602.html   Collector的配置和使用(主要看如何配)
- opentelemetry.io/docs/collector/custom-collector/  Building a custom collector
- opentelemetry.io/docs/collector/trace-receiver/   Building a Trace Receiver

 

0条评论
作者已关闭评论
h****n
2文章数
0粉丝数
h****n
2 文章 | 0 粉丝
h****n
2文章数
0粉丝数
h****n
2 文章 | 0 粉丝
原创

opentelemetry简介及二次开发实操

2024-04-29 02:33:12
122
0
## 简介
OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题。所观测的数据类型目前就包含trace、metrics、logs
 
![null](otel工作范围.png)
如上图所示,涵盖了opentelemetry要做的几件事情,统一格式的数据模型(包含监控、日志和链路),标准的采集器,标准的处理器,标准的导出器。
 
 
 
## 怎么用
 
opentelemetry-collector是一个实现了上面四种标准化问题的可执行文件。里面包含了官方提供的采集器,处理器和导出器。各个厂商也对其进行扩展,扩展出适应自身产品的采集器与导出器。
 
![null](otel-整体架构.png)
 
它运行起来可以是按上面的模式。他既可以作为Agent运行在应用的Host上,统一收集应用的数据上报到服务端;也可以作为Server在服务端接收数据,然后持久化。
 
由于在OpenTelemetry中数据模型是标准化的,而且实现这它的厂商也比较多,因此它也可以作为一种适配器而存在,来对两个不同厂商的数据进行转换。如以skywalking格式接收链路数据,再以实现opentracing标准的Jaeger输出数据。
 
下面则通过两个具体例子介绍collector如何配置使用
 
### 监控
 
监控业界最常用的开源组件则是prometheus。在某些监控方案中,会通过prometheus做指标采集,然后持久化到clickhouse中。下面例子则以prometheus作为receiver,logging作为exporter,当采集指标成功则会在日志输出。
 
配置如下
 
```
#config.yaml
 
    receivers:
      prometheus:
        config:
          scrape_configs:
          - job_name: kubernetes-nodes-cadvisor
            honor_timestamps: true
            scrape_interval: 15s
            scrape_timeout: 10s
            metrics_path: /metrics
            scheme: https
            authorization:
              type: Bearer
              credentials_file: /var/run/secrets/kubernetes.io/serviceaccount/token
            tls_config:
              ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
              insecure_skip_verify: true
            follow_redirects: true
            relabel_configs:
            - separator: ;
              regex: __meta_kubernetes_node_label_(.+)
              replacement: $$1
              action: labelmap
            - separator: ;
              regex: (.*)
              target_label: __address__
              replacement: kubernetes.default.svc:443
              action: replace
            - source_labels: [__meta_kubernetes_node_name]
              separator: ;
              regex: (.+)
              target_label: __metrics_path__
              replacement: /api/v1/nodes/$$1/proxy/metrics/cadvisor
              action: replace
            kubernetes_sd_configs:
            - role: node
              follow_redirects: true
    processors:
    exporters:
      logging:
    service:
      pipelines:
        metrics:
          receivers: [prometheus]
          processors: []
          exporters: [logging]
```
一般collector的配置都会包含receivers,receivers,exporters和service几个模块。
 
前三个就是上面提到的接收器,处理器和导出器。当前collecotr要用到都在对应位置添加上其配置,不同组件的配置和行为都不一样。像prometheus的只需要填上其采集配置则可,他就会跟prometheus一样向各个应用中刮取指标。像其他的receiver可能是暴露一个端口监听,接收其他应用请求上报过来的数据。
>>ps: 在porometheus原本的一些美元符号`$`,放到collector处会有转义,因此要保证其按照`$`原样传递给receiver,要将`$`写成`$$`。如上面的`$1`写成`$$1`
 
serivce下面包含了pipelines,pipelines下就是包含了三种数据(监控日志追踪)所使用组件的组合,如上面例子则是监控数据,则用metrics,它的receiver是prometheus,无需使用process,exporter是logging。
 
最终通过命令运行
```
./otelcol  --config=config.yaml
```
或者通过容器方式运行。
 
>>ps: 以容器方式运行的时候,注意pod的sa所具备的权限要与prometheus一致,否则会有权限问题
 
 
运行后能看到采集指标成功的日志
```
2022-10-23T17:53:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
2022-10-23T17:54:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
2022-10-23T17:55:46.501Z INFO loggingexporter/logging_exporter.go:361 MetricsExporter {"#metrics": 104}
```
 
### 链路追踪
 
链路追踪的例子则是从istio中收集链路数据,通过otlp上传到上游的collector,同时同样以日志输出。
 
顺带提一下应用相关的,istio的应用是经典demo  bookinfo,由网格暴露出来的链路数据则是opencensus协议的,网格中的应用在发生互调时需要透传X-B3的头部和request-id,网格的配置中需要加入下面的内容
```
  meshConfig:
    defaultConfig:
      tracing:
        openCensusAgent:
          address: {collector’s svc name or ip}:55678
          context:
          - W3C_TRACE_CONTEXT
          - B3
        sampling: 100
    enableTracing: true
```
加完配置后记得需要把相关网格应用重启下,配置才会生效
 
 
collector的配置如下
```
    receivers:
      opencensus:
    processors:
    exporters:
      logging:
      otlp:
        endpoint: "172.16.142.33:9817"
        insecure: true  ## 18版本这样设置可以
        tls:
          insecure: true   ##在61版本只能这样设置
    service:
      pipelines:
        traces:
          receivers: [opencensus]
          processors: []
          exporters: [logging,otlp]
```
receivers,processors,exporters和service这几个组件跟监控的类似,必不可少。由于是链路追踪,service的pipline就是traces,同理如果遇到日志场景,则是logs了。traces中与监控的类似,把用到的组件集合都填写上去。
 
运行起来后,这个opencensus的receiver有别与prometheus。大多数receiver与opencensus类似,是被动接收;而prometheus是主动去拉。程序运行起来会看见opencensus receiver运行起来的日志,但是并不会显示它监听的端口,上面的55678是默认的
```
2022-10-26T00:33:45.406Z info pipelines/pipelines.go:102 Receiver is starting... {"kind": "receiver", "name": "opencensus", "pipeline": "traces"}
2022-10-26T00:33:45.407Z info pipelines/pipelines.go:106 Receiver started. {"kind": "receiver", "name": "opencensus", "pipeline": "traces"}
```
>>如需修改端口,在opencensus中加入grpc的配置,在endpoint字段通过{ip}:{port}格式指定
 
当请求bookinfo的productpage后,会看到logging exporter输出的日志
```
2021-01-22T13:26:49.320Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 2}
2021-01-22T13:26:49.430Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 1}
2021-01-22T13:26:50.422Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 4}
2021-01-22T13:26:50.571Z INFO loggingexporter/logging_exporter.go:313 TracesExporter {"#spans": 1}
```
 
## 扩展
 
目前用collector收集数据后上报至上游collector,而上游的collector前面有增加了一个网关,用作鉴权之用,上报数据需要带上license。因此现有的exporter已经不够用了,需要自己扩展,不过网关方面有提供SDK,上报数据仅需要给网关的地址,license,调用SDK则可。
 
### 环境准备
 
按照官网的说法,扩展任意一个组件,其实都要完整构建一次collector。collector是用一个ocb的脚手架来生成代码以及构件可执行文件
 
到github中下载合适自己操作系统和CPU架构的版本`github.com/open-telemetry/opentelemetry-collector/releases`
 
下载完毕将其重命名并赋予可执行权限
```
mv ocb_0.53.0_darwin_amd64 ocb
chmod 777 ocb
```
 
创建一个 builder manifest文件,这个文件就描述了当前这个自定义的collector名称,版本,描述,输出路径,以及包含哪些receiver,processor,exporter组件,样例如下
```
builder-config.yaml
dist:
  name: otelcol-dev  ##collector的名称
  description: "Basic OTel Collector distribution for Developers" ##描述信息
  output_path: ./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder ##生成的代码输出路径
  otelcol_version: "0.61.0"  ##collector是基于主干的哪个版本
  go: "/usr/local/go/bin/go"
 
## 下面这些就是当前collector包含的组件,其实是通过gomod把依赖添加进来
 
exporters:
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter
      v0.61.0"
  - import: go.opentelemetry.io/collector/exporter/loggingexporter
    gomod: go.opentelemetry.io/collector v0.61.0
  - import: go.opentelemetry.io/collector/exporter/otlpexporter
    gomod: go.opentelemetry.io/collector v0.61.0
 
receivers:
  - import: go.opentelemetry.io/collector/receiver/otlpreceiver
    gomod: go.opentelemetry.io/collector v0.61.0
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver
      v0.61.0"
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver
      v0.61.0"
 
processors:
  - import: go.opentelemetry.io/collector/processor/batchprocessor
    gomod: go.opentelemetry.io/collector v0.61.0
 
```
上面的列表就表明当前collector只包含这些组件,如果实际应用时要用其他的组件,运行的时候就会报错,只需要把组件的gomod添加进来则可,第三方的可以在github上opentelemetry-collector-contrib这个项目下找,里面放的都是第三方扩展出来的组件。 
 
执行下面命令会按照上面配置生成代码,下载依赖,构件二进制文件
```
./ocb --config builder-config.yaml
2022-10-26T10:56:57.951+0800 INFO internal/command.go:121 OpenTelemetry Collector Builder {"version": "0.61.0", "date": "2022-09-28T20:12:59Z"}
2022-10-26T10:56:57.954+0800 INFO internal/command.go:154 Using config file {"path": "builder-config.yaml"}
2022-10-26T10:56:58.111+0800 INFO builder/config.go:103 Using go {"go-executable": "/usr/local/go/bin/go"}
2022-10-26T10:56:58.112+0800 INFO builder/main.go:76 Sources created{"path": "./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder"}
2022-10-26T10:56:58.394+0800 INFO builder/main.go:118 Getting go modules
2022-10-26T10:56:58.492+0800 INFO builder/main.go:87 Compiling
2022-10-26T10:57:04.101+0800 INFO builder/main.go:99 Compiled {"binary": "./otelcol-dev/src/go.opentelemetry.io/collector/cmd/builder/otelcol-dev"}
```
项目包依赖方式默认是使用gomod模式,当然也可以用回go path。但是ocb执行的时候虽然下好了依赖包,还是会提示需要执行go mod vendor把依赖添加好。再次执行即可构建成功。
 
生成好的collector配上前文介绍的collector的配置即可运行
 
```
./otelcol-dev --conifg=config.yaml
```
 
进入生成好的代码,会包含下列内容
```
|--compoents.go
|--main.go
|--main_others.go
|--main_windows.go
```
这些代码都是由脚手架生成的,不要手动修改,否则再次执行ocb构件时就会被抹掉。进入compoents.go,会看到各个组件的factory,每个组件都是传入其factory,最终collector调用factory构造出组件的实例
```
func components() (component.Factories, error) {
var err error
factories := component.Factories{}
 
factories.Extensions, err = component.MakeExtensionFactoryMap(
)
if err != nil {
return component.Factories{}, err
}
 
factories.Receivers, err = component.MakeReceiverFactoryMap(
otlpreceiver.NewFactory(),
opencensusreceiver.NewFactory(),
prometheusreceiver.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
factories.Exporters, err = component.MakeExporterFactoryMap(
jaegerexporter.NewFactory(),
loggingexporter.NewFactory(),
otlpexporter.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
factories.Processors, err = component.MakeProcessorFactoryMap(
batchprocessor.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}
 
return factories, nil
}
```
 
### 扩展自定义的exporter
 
在项目目录下创建一个目录 ctgarmsexporter,用来存放exporter的代码,目录中创建以下几个文件
```
ctgarmsexporter
|--cofig.go   ##包含这个组件配置的结构体
|--ctgarms-exporter.go  ##包含组件的实际逻辑
|--factory.go    ## 组件的factory,供传递给上面的介绍的components,用于构造组件
```
 
config.go的定义形如下面所示
```
type Config struct {
config.ExporterSettings `mapstructure:",squash"`
ArmsGateway             string `mapstructure:"arms_gateway"`
AppId                   string `mapstructure:"app_id"`
License                 string `mapstructure:"license"`
 
}
 
// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
 
 
return nil
}
 
```
结构的第一个成员是必选,同理receiver是ReceiverSetting,Validate()方法是继承了某些结构时所需的
 
再看factory.go
```
func NewFactory() component.ExporterFactory {
return component.NewExporterFactory(
typeStr,
createDefaultConfig,
component.WithMetricsExporter(createCtgArmsMetricExporter, stability),
component.WithTracesExporter(createCtgArmsTraceExporter, stability))
}
```
主要调用component.NewExporterFactory函数构造一个ExporterFactory,传参就包含了exporter的名字typeStr,这里是个常量 ctgarms,第二个参数是构造默认配置参数的函数createDefaultConfig,后面两个参数是表明了当前这个Exporter能给指标Metrices,链路追踪Trace还是给日志Log使用。
 
先看看配置参数的函数createDefaultConfig,组件的Config中凡是有默认参数的都在这里设置
```
func createDefaultConfig() config.Exporter {
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
}
}
```
 
后面的参数挑其中一个component.WithMetricsExporter函数来说,createCtgArmsTraceExporter是构造用于对应pipeline的组件实例的构造函数,定义如下,第二个参数是StabilityLevel 稳定性等级,即stable,bate,alpha等等,是一个枚举。
```
func createCtgArmsTraceExporter(_ context.Context, params component.ExporterCreateSettings, baseCfg config.Exporter) (component.TracesExporter, error) {
 
logger := params.Logger
exporterCfg := baseCfg.(*Config)
 
sender := NewCtgArmsSender()
sender.logger = logger
sender.config = exporterCfg
logger.Info(fmt.Sprintf("arms config appid:%s,url:%s,license:%s", exporterCfg.AppId, exporterCfg.ArmsGateway, exporterCfg.License))
err := sender.initSdk()
if err != nil {
return nil, err
}
 
return exporterhelper.NewTracesExporter(context.TODO(),
params,
baseCfg,
sender.pushTraceData)
 
}
```
可有从构造函数的入参后两个入参获取从配置文件中读回来的配置内容,以及日志数据对象,同时还在这里把SDK进行初始化,最终通过一个exporterhelper.NewTracesExporter函数构造出TraceExporter实例。最后一个参数则是执行Export操作时的关键逻辑。在后续要为exporter加入发送队列,重试,超时等机制,可以直接在exporterhelper.NewTracesExporter中补充,如下面代码所示。当然相关配置,默认配置同样也要在Config结构以及createDefaultConfig()函数中补充,可以进入其他组建的源码中获取参考。
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings))
```
 
NewCtgArmsSender的定义在ctgarms-exporter.go中
```
type ctgArmsSender struct {
logger            *zap.Logger
config            *Config
client            *sdk.DataProxyClient
metricsMarshal    pmetric.Marshaler
traceMarshal      ptrace.Marshaler
}
 
 
func (c *ctgArmsSender) pushTraceData(
_ context.Context,
td ptrace.Traces,
) error {
var err error
datas, err := c.traceMarshal.MarshalTraces(td)
if err != nil {
return err
}
err = c.client.TransData(tacesKey, &datas)
return err
}
```
pushTraceData就是把要导出的数据ptrace.Traces调用一下SDK进行发送,由于SDK发送是数据是[]byte,因此这里需要用c.traceMarshal把ptrace.Traces转成二进制数组的形式,ptrace.Traces就是opentelelmetry标准的数据格式,刚好接收方也是opentelemetry,这里不需要作任何格式转换,而上传又通过gRPC,因此这个c.traceMarshal实际是一个实现了ptrace.Marshaler接口的ProtoMarshaler实例,其构造如下所示
```
traceMarshal:      ptrace.NewProtoMarshaler()
```
实现ptrace.Marshaler的对象也有不少,可查看其他组件的代码中寻找。
 
编码完毕后,回去builder-config.yaml中,在exporters中添加上刚刚编写的exporter,如平时添加本地依赖包一样,ocb构建时才会把扩展的exporter包含在这个collector中
```
 
exporters:
  - gomod:
      "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter
      v0.61.0"
  - import: go.opentelemetry.io/collector/exporter/loggingexporter
    gomod: go.opentelemetry.io/collector v0.61.0
  - import: go.opentelemetry.io/collector/cmd/builder/ctgarmsexporter
    gomod: go.opentelemetry.io/collector/cmd/builder v0.0.0
  - import: go.opentelemetry.io/collector/exporter/otlpexporter
    gomod: go.opentelemetry.io/collector v0.61.0
```
 
再执行ocb,就能看到components.go的exporter factory有包含这个自定义exporter进去
```
factories.Exporters, err = component.MakeExporterFactoryMap(
jaegerexporter.NewFactory(),
loggingexporter.NewFactory(),
ctgarmsexporter.NewFactory(),
otlpexporter.NewFactory(),
)
```
构造出来的collector就包含了这个扩展的exporter,可以在配置中添加使用
```
exporters:
  logging:
    logLevel: debug
  ctgarms:
    arms_gateway: 127.0.0.1:8089
    app_id: sjaooawjo
    license: sdfahfelsjfo
service:
  pipelines:
    traces:
          receivers: [opencensus]
          processors: []
          exporters: [ctgarms]
```
>>ps:receiver,process部分配置已省略
 
 
### 扩展的坑与排查
 
上述的扩展主要是参照了阿里的logexporter,在运行过程中遇到了一些报错,按照报错的提示以及参照了其他expoter的代码,逐步往exporter中添砖加瓦,下面就罗列了几个遇到过的问题及处理措施
 
#### 问题1,引入retry
 
问题:因SDK更新token后,导致部分上报的协程未同步更新,上报数据会报token_not_found,此外exporterhelper会提示启用retry
解决:参照了otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.RetrySettings   `mapstructure:"retry_on_failure"`
```
同时factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
RetrySettings:    exporterhelper.NewDefaultRetrySettings(),
```
exporterhelper构造MetricsExporter时也加上retry的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings))
```
这样一次上报失败之后它不会马上报错而丢失数据,会经过重试后才尝试丢失数据
 
当加了上述配置后,可以在配置文件ctgarms的exporter配置处增加retry_on_failure,配置重试相关的属性。不配置按照默认的也可以。
 
#### 问题2,引入send_queue
 
问题:同样由于token_not_found问题,上报数据经过多次重试后,仍会被drop掉,exporterhelper会提示启用send_queue
解决:同样参照otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.QueueSettings   `mapstructure:"sending_queue"`
```
由于加了这个属性,Congfig必须实现Validate()方法,验证queueSettings的配置
```
func (cfg *Config) Validate() error {
if err := cfg.QueueSettings.Validate(); err != nil {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}
 
return nil
}
```
factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
QueueSettings:    exporterhelper.NewDefaultQueueSettings(),
```
exporterhelper构造MetricsExporter时也加上send_queue的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings),
```
 
#### 问题3,上报数据过大,导致网关测报错
 
问题:由于gRPC传输数据默认最大数据量为4M,而cadvisor采的数据量大概6M左右,报错内容是grpc: receiverd message after decompression larger than max (xxxxx vs. 4194304)
解决:对于这个问题没有额外在exporter中修改,因为cadvisor的指标本身数量不多,只是转换成[]byte后数据量巨大,长远来说应该由SDK提供数据分片发送机制,目前临时方案是使用batch processor
```
batch:
  send_batch_size: 10
  send_batch_max_size: 30
```
经查看代码后发现,这size不是上传时的字节数,而是对应数据类别的数据条数,比如指标是指标个数,日志是日志条数。使用processor之后,未报过相似错误
 
 
#### 问题4,报send_queue已经满了,建议调大send_queue的size
 
问题:当SDK报auto reconnect后,没发现上报成功的日志,也没发现发送失败的日志,过一段时间后大量在报send_queue已经满了,建议调大send_queue的size
解决:目测是在调用SDK发送数据时阻塞了,导致后续的数据积压在send_queue,不能单纯调整send_queue的size,尝试在exporter中加入超时,这样发送阻塞的数据要么retry,最后还是不行则drop掉,不应该一直积在send_queue影响后续所有数据。
同样参照otlpexporter的源码,在配置结构Config中增加
```
exporterhelper.TimeoutSettings `mapstructure:",squash"` 
```
同时factory.go的createDefaultConfig()函数中增加它的默认参数设置,
```
TimeoutSettings:  exporterhelper.NewDefaultTimeoutSettings(),
```
exporterhelper构造MetricsExporter时也加上timeout的option
```
return exporterhelper.NewMetricsExporter(
context.TODO(),
params,
baseCfg,
sender.pushMetricsData,
exporterhelper.WithRetry(exporterCfg.RetrySettings),
exporterhelper.WithQueue(exporterCfg.QueueSettings),
exporterhelper.WithTimeout(exporterCfg.TimeoutSettings))
```
因后面也同步更新了SDK,添加了这个效果无法验证。后续连跑了几天也没发现这个因发送阻塞而导致塞满send_queue的问题
 
#### 问题小结
虽然上述问题基本上是由于SDK自身问题或不完善导致,但配合SDK修改而调整exporter过程中,exporter自身也在不断完善,不断增强了自身的健壮性。
 
## 参考链接
- blog.csdn.net/DaoCloud_daoke/article/details/122682509   云原生观测性 - OpenTelemetry(前面还可以,后边从代码开始就不好看)
- zhuanlan.zhihu.com/p/361652744  OpenTelemetry 简析(阿里的文章)
- cloud.tencent.com/developer/article/1791748   在Kubernetes上部署OpenTelemetry收集器(实践案例来源)
- cnblogs.com/charlieroro/p/13883602.html   Collector的配置和使用(主要看如何配)
- opentelemetry.io/docs/collector/custom-collector/  Building a custom collector
- opentelemetry.io/docs/collector/trace-receiver/   Building a Trace Receiver

 

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0