searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

open-falcon源码解析-agent篇

2023-10-07 07:19:08
6
0

本文将对agent的源码进行解析,深入理解agent的功能实现,agent如何与其他falcon组件进行交互,agent的plugin机制的实现。 agent用于采集机器负载监控指标,比如cpu.idle、load.1min、disk.io.util等等,每隔60秒push给Transfer。agent与Transfer建立了长连接,数据发送速度比较快,agent提供了一个http接口/v1/push用于接收用户手工push的一些数据,然后通过长连接迅速转发给Transfer。

默认的监控项

funcs.BuildMappers() // 初始化全局Mappers,保存了一系列的metric的采集方法

Mappers保存了默认的监控性,可以看到就是一个列表的列表的列表,比如,[cpu相关][load][1min load]

type FuncsAndInterval struct {
	Fs       []func() []*model.MetricValue
	Interval int
}

var Mappers []FuncsAndInterval

监控项的数据结构

type MetricValue struct {
	Endpoint  string      `json:"endpoint"`
	Metric    string      `json:"metric"`
	Value     interface{} `json:"value"`
	Step      int64       `json:"step"`
	Type      string      `json:"counterType"`
	Tags      string      `json:"tags"`
	Timestamp int64       `json:"timestamp"`
}

agent自带dashboard

访问1988端口即可访问agent自带的dashboard,可以看到本机的系统状态。

go cron.InitDataHistory() // 每秒更新cpu和disk的状态信息

自发现,向hbs上报本机信息

cron.ReportAgentStatus() // 汇报host,ip等信息给hbs
req := model.AgentReportRequest{
    Hostname:      hostname,
    IP:            g.IP(),
    AgentVersion:  g.VersionMsg(),
    PluginVersion: g.GetCurrPluginVersion(),
}

向hbs获取本机应该运行哪些插件

plugin是和hostGroup绑定而非和host,但最终都会有个host和plugin的对应关系,这个对对应关系保存在portal库中,hbs是和portal库联系最紧密的uajina,所以通过向hbs请求获取本机应该运行哪些插件。

req := model.AgentHeartbeatRequest{
    Hostname: hostname,
}

var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
if err != nil {
    log.Println("call Agent.MinePlugin fail:", err)
    continue
}

type AgentPluginsResponse struct {
	Plugins   []string
	Timestamp int64
}

向hbs请求到自己应该运行哪些插件之后就开始定时运行这些插件,将这些插件采集的数据作为监控数据上报。

func AddNewPlugins(newPlugins map[string]*Plugin) {
	for fpath, newPlugin := range newPlugins {
		if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
			continue
		}

		Plugins[fpath] = newPlugin
		sch := NewPluginScheduler(newPlugin)
		PluginsWithScheduler[fpath] = sch
		sch.Schedule()
	}
}

自定义监控,向hbs获取非默认监控

agent安装之后就会有很多默认的监控项,主要是和系统相关的,比如内存,磁盘,cpu,网卡等,除此之外还需要监控一些自定义的配置,比如进程端口等等,这些自定义的监控项配置在监控模板上,模板和hostgroup绑定,最终也会有一个host和自定监控项的对应关系,这一部分的监控项也保存在portal库中,需要项hbs请求获取。

cron.SyncBuiltinMetrics() // 获取本机应该运行哪些自定义的监控项

向transfer上报监控数据

cron.Collect()

通过遍历全局变量Mappers,每个采集项目会启动一个goroutine

func Collect() {

	if !g.Config().Transfer.Enabled {
		return
	}

	if len(g.Config().Transfer.Addrs) == 0 {
		return
	}

	for _, v := range funcs.Mappers {
		go collect(int64(v.Interval), v.Fs)
	}
}

每个goroutine都会调用SendToTransfer向transfer发送数据

g.SendToTransfer(mvs)

transfer的地址有多个,agent是如何负载均衡的

tranfer的链接地址保存在一个map中,key就是agent中配置的tranfer组件的ip地址,agent并不会在启动的时候就初始化这个map,不会去连接tranfer的地址,而是在发送监控的数据的时候去map中取conn,如果可以取到说明该ip的已经初始化过可以直接使用,如果取不到说明该ip还未建立过连接就是初始化连接并保存在map中。

TransferClients     map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}

agent使用随机索引的方式进行负载均衡

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
	rand.Seed(time.Now().UnixNano())
	for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
		addr := Config().Transfer.Addrs[i]

		c := getTransferClient(addr)
		if c == nil {
			c = initTransferClient(addr)
		}

		if updateMetrics(c, metrics, resp) {
			break
		}
	}
}

为什么agent连接hbs的地址只有一个?

open-falcon可以支持大规模集群的原因就是每个组件都可以横向扩容,agent可以连接transfer的地址可以有多个,agent采用负载均衡的方法像transfer上报数据,但是连接hbs的地址却只有一个。

"heartbeat": {
    "enabled": true,
    "addr": "0.0.0.0:6030",
    "interval": 60,
    "timeout": 1000
},

这是因为hbs这个组件的功能就决定它不会有太大的性能压力,按照官方的说法单台hbs可以支撑4000个agent。

  • agent提供tpl,hostGroup相关的http api,而监控系统是面向管理员的注定访问量不大,所以http server这部分的压力不大
  • hbs通过rpc调用向agent下发配置,提供的接口有:插件,自定义监控,trustIp,agent的同步间隔一般是60s,接口比较少,请求频率不高

hbs可以采用lvs四层代理的方式实现横向扩展

agent连接transfer时采用的负载均衡方法就是通过随机数的方法产生一个随机索引从ip列表中获取一个连接地址。

for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
    addr := Config().Transfer.Addrs[i]

    c := getTransferClient(addr)
    if c == nil {
        c = initTransferClient(addr)
    }

    if updateMetrics(c, metrics, resp) {
        break
    }
}
0条评论
作者已关闭评论
Mr. 油
92文章数
0粉丝数
Mr. 油
92 文章 | 0 粉丝
原创

open-falcon源码解析-agent篇

2023-10-07 07:19:08
6
0

本文将对agent的源码进行解析,深入理解agent的功能实现,agent如何与其他falcon组件进行交互,agent的plugin机制的实现。 agent用于采集机器负载监控指标,比如cpu.idle、load.1min、disk.io.util等等,每隔60秒push给Transfer。agent与Transfer建立了长连接,数据发送速度比较快,agent提供了一个http接口/v1/push用于接收用户手工push的一些数据,然后通过长连接迅速转发给Transfer。

默认的监控项

funcs.BuildMappers() // 初始化全局Mappers,保存了一系列的metric的采集方法

Mappers保存了默认的监控性,可以看到就是一个列表的列表的列表,比如,[cpu相关][load][1min load]

type FuncsAndInterval struct {
	Fs       []func() []*model.MetricValue
	Interval int
}

var Mappers []FuncsAndInterval

监控项的数据结构

type MetricValue struct {
	Endpoint  string      `json:"endpoint"`
	Metric    string      `json:"metric"`
	Value     interface{} `json:"value"`
	Step      int64       `json:"step"`
	Type      string      `json:"counterType"`
	Tags      string      `json:"tags"`
	Timestamp int64       `json:"timestamp"`
}

agent自带dashboard

访问1988端口即可访问agent自带的dashboard,可以看到本机的系统状态。

go cron.InitDataHistory() // 每秒更新cpu和disk的状态信息

自发现,向hbs上报本机信息

cron.ReportAgentStatus() // 汇报host,ip等信息给hbs
req := model.AgentReportRequest{
    Hostname:      hostname,
    IP:            g.IP(),
    AgentVersion:  g.VersionMsg(),
    PluginVersion: g.GetCurrPluginVersion(),
}

向hbs获取本机应该运行哪些插件

plugin是和hostGroup绑定而非和host,但最终都会有个host和plugin的对应关系,这个对对应关系保存在portal库中,hbs是和portal库联系最紧密的uajina,所以通过向hbs请求获取本机应该运行哪些插件。

req := model.AgentHeartbeatRequest{
    Hostname: hostname,
}

var resp model.AgentPluginsResponse
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
if err != nil {
    log.Println("call Agent.MinePlugin fail:", err)
    continue
}

type AgentPluginsResponse struct {
	Plugins   []string
	Timestamp int64
}

向hbs请求到自己应该运行哪些插件之后就开始定时运行这些插件,将这些插件采集的数据作为监控数据上报。

func AddNewPlugins(newPlugins map[string]*Plugin) {
	for fpath, newPlugin := range newPlugins {
		if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
			continue
		}

		Plugins[fpath] = newPlugin
		sch := NewPluginScheduler(newPlugin)
		PluginsWithScheduler[fpath] = sch
		sch.Schedule()
	}
}

自定义监控,向hbs获取非默认监控

agent安装之后就会有很多默认的监控项,主要是和系统相关的,比如内存,磁盘,cpu,网卡等,除此之外还需要监控一些自定义的配置,比如进程端口等等,这些自定义的监控项配置在监控模板上,模板和hostgroup绑定,最终也会有一个host和自定监控项的对应关系,这一部分的监控项也保存在portal库中,需要项hbs请求获取。

cron.SyncBuiltinMetrics() // 获取本机应该运行哪些自定义的监控项

向transfer上报监控数据

cron.Collect()

通过遍历全局变量Mappers,每个采集项目会启动一个goroutine

func Collect() {

	if !g.Config().Transfer.Enabled {
		return
	}

	if len(g.Config().Transfer.Addrs) == 0 {
		return
	}

	for _, v := range funcs.Mappers {
		go collect(int64(v.Interval), v.Fs)
	}
}

每个goroutine都会调用SendToTransfer向transfer发送数据

g.SendToTransfer(mvs)

transfer的地址有多个,agent是如何负载均衡的

tranfer的链接地址保存在一个map中,key就是agent中配置的tranfer组件的ip地址,agent并不会在启动的时候就初始化这个map,不会去连接tranfer的地址,而是在发送监控的数据的时候去map中取conn,如果可以取到说明该ip的已经初始化过可以直接使用,如果取不到说明该ip还未建立过连接就是初始化连接并保存在map中。

TransferClients     map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}

agent使用随机索引的方式进行负载均衡

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
	rand.Seed(time.Now().UnixNano())
	for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
		addr := Config().Transfer.Addrs[i]

		c := getTransferClient(addr)
		if c == nil {
			c = initTransferClient(addr)
		}

		if updateMetrics(c, metrics, resp) {
			break
		}
	}
}

为什么agent连接hbs的地址只有一个?

open-falcon可以支持大规模集群的原因就是每个组件都可以横向扩容,agent可以连接transfer的地址可以有多个,agent采用负载均衡的方法像transfer上报数据,但是连接hbs的地址却只有一个。

"heartbeat": {
    "enabled": true,
    "addr": "0.0.0.0:6030",
    "interval": 60,
    "timeout": 1000
},

这是因为hbs这个组件的功能就决定它不会有太大的性能压力,按照官方的说法单台hbs可以支撑4000个agent。

  • agent提供tpl,hostGroup相关的http api,而监控系统是面向管理员的注定访问量不大,所以http server这部分的压力不大
  • hbs通过rpc调用向agent下发配置,提供的接口有:插件,自定义监控,trustIp,agent的同步间隔一般是60s,接口比较少,请求频率不高

hbs可以采用lvs四层代理的方式实现横向扩展

agent连接transfer时采用的负载均衡方法就是通过随机数的方法产生一个随机索引从ip列表中获取一个连接地址。

for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
    addr := Config().Transfer.Addrs[i]

    c := getTransferClient(addr)
    if c == nil {
        c = initTransferClient(addr)
    }

    if updateMetrics(c, metrics, resp) {
        break
    }
}
文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0