本文将对agent的源码进行解析,深入理解agent的功能实现,agent如何与其他falcon组件进行交互,agent的plugin机制的实现。 agent用于采集机器负载监控指标,比如cpu.idle、load.1min、disk.io.util等等,每隔60秒push给Transfer。agent与Transfer建立了长连接,数据发送速度比较快,agent提供了一个http接口/v1/push用于接收用户手工push的一些数据,然后通过长连接迅速转发给Transfer。
默认的监控项
funcs.BuildMappers() // 初始化全局Mappers,保存了一系列的metric的采集方法
Mappers保存了默认的监控性,可以看到就是一个列表的列表的列表,比如,[cpu相关][load][1min load]
type FuncsAndInterval struct {
Fs []func() []*model.MetricValue
Interval int
}
var Mappers []FuncsAndInterval
监控项的数据结构
type MetricValue struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Value interface{} `json:"value"`
Step int64 `json:"step"`
Type string `json:"counterType"`
Tags string `json:"tags"`
Timestamp int64 `json:"timestamp"`
}
agent自带dashboard
访问1988端口即可访问agent自带的dashboard,可以看到本机的系统状态。
go cron.InitDataHistory() // 每秒更新cpu和disk的状态信息
自发现,向hbs上报本机信息
cron.ReportAgentStatus() // 汇报host,ip等信息给hbs
req := model.AgentReportRequest{
Hostname: hostname,
IP: g.IP(),
AgentVersion: g.VersionMsg(),
PluginVersion: g.GetCurrPluginVersion(),
}
向hbs获取本机应该运行哪些插件
plugin是和hostGroup绑定而非和host,但最终都会有个host和plugin的对应关系,这个对对应关系保存在portal库中,hbs是和portal库联系最紧密的uajina,所以通过向hbs请求获取本机应该运行哪些插件。
req := model.AgentHeartbeatRequest{
Hostname: hostname,
}
var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
if err != nil {
log.Println("call Agent.MinePlugin fail:", err)
continue
}
type AgentPluginsResponse struct {
Plugins []string
Timestamp int64
}
向hbs请求到自己应该运行哪些插件之后就开始定时运行这些插件,将这些插件采集的数据作为监控数据上报。
func AddNewPlugins(newPlugins map[string]*Plugin) {
for fpath, newPlugin := range newPlugins {
if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
continue
}
Plugins[fpath] = newPlugin
sch := NewPluginScheduler(newPlugin)
PluginsWithScheduler[fpath] = sch
sch.Schedule()
}
}
自定义监控,向hbs获取非默认监控
agent安装之后就会有很多默认的监控项,主要是和系统相关的,比如内存,磁盘,cpu,网卡等,除此之外还需要监控一些自定义的配置,比如进程端口等等,这些自定义的监控项配置在监控模板上,模板和hostgroup绑定,最终也会有一个host和自定监控项的对应关系,这一部分的监控项也保存在portal库中,需要项hbs请求获取。
cron.SyncBuiltinMetrics() // 获取本机应该运行哪些自定义的监控项
向transfer上报监控数据
cron.Collect()
通过遍历全局变量Mappers,每个采集项目会启动一个goroutine
func Collect() {
if !g.Config().Transfer.Enabled {
return
}
if len(g.Config().Transfer.Addrs) == 0 {
return
}
for _, v := range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}
每个goroutine都会调用SendToTransfer向transfer发送数据
g.SendToTransfer(mvs)
transfer的地址有多个,agent是如何负载均衡的
tranfer的链接地址保存在一个map中,key就是agent中配置的tranfer组件的ip地址,agent并不会在启动的时候就初始化这个map,不会去连接tranfer的地址,而是在发送监控的数据的时候去map中取conn,如果可以取到说明该ip的已经初始化过可以直接使用,如果取不到说明该ip还未建立过连接就是初始化连接并保存在map中。
TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
agent使用随机索引的方式进行负载均衡
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
if updateMetrics(c, metrics, resp) {
break
}
}
}
为什么agent连接hbs的地址只有一个?
open-falcon可以支持大规模集群的原因就是每个组件都可以横向扩容,agent可以连接transfer的地址可以有多个,agent采用负载均衡的方法像transfer上报数据,但是连接hbs的地址却只有一个。
"heartbeat": {
"enabled": true,
"addr": "0.0.0.0:6030",
"interval": 60,
"timeout": 1000
},
这是因为hbs这个组件的功能就决定它不会有太大的性能压力,按照官方的说法单台hbs可以支撑4000个agent。
- agent提供tpl,hostGroup相关的http api,而监控系统是面向管理员的注定访问量不大,所以http server这部分的压力不大
- hbs通过rpc调用向agent下发配置,提供的接口有:插件,自定义监控,trustIp,agent的同步间隔一般是60s,接口比较少,请求频率不高
hbs可以采用lvs四层代理的方式实现横向扩展
agent连接transfer时采用的负载均衡方法就是通过随机数的方法产生一个随机索引从ip列表中获取一个连接地址。
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
if updateMetrics(c, metrics, resp) {
break
}
}