Judge用于告警判断,agent将数据push给Transfer,Transfer不但会转发给Graph组件来绘图,还会转发给Judge用于判断是否触发告警。 顾名思义judge的作用就是判断监控数据是否出发了报警,所以judge需要接收transfer推送的数据,缓存报警策略(Strategies),将触发报警的事件保存在redis。
接收transfer推送的数据
judge会将transfer推送的数据缓存起来
store.InitHistoryBigMap()
缓存metric数据
metric数据缓存在一个嵌套的map中, 每一个metric会生成一个md5值,这个md5值有endpoint,metric,tag产生,最外层的map的key可以理解为一个索引,由md5值的前两个字母组成,里面map的key是这个md5值,里面map的value是一个链表
func InitHistoryBigMap() {
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap()
}
}
}
var HistoryBigMap = make(map[string]*JudgeItemMap)
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}
type SafeLinkedList struct {
sync.RWMutex
L *list.List
}
打印这个缓存map的内容如下
开始打印...
map[52][52f617420b34c7b2a50bc3f22e10b6c7] length 1
<Endpoint:ubuntu, Metric:df.bytes.free.percent, Value:95.031724, Timestamp:1654153380, JudgeType:GAUGE Tags:map[fstype:vfat mount:/boot/efi os:ubuntu vm:yes]>
map[cc][ccbd15cee1de6d042e38d2a9e82922ac] length 1
<Endpoint:ubuntu, Metric:df.bytes.free.percent, Value:82.249070, Timestamp:1654153380, JudgeType:GAUGE Tags:map[fstype:ext4 mount:/ os:ubuntu vm:yes]>
过滤metric数据
缓存的过程中会先过滤一遍,transfer会将所有的metric推送过来,但是有些metric并没有配置告警策略,需要将这部分过滤掉。falcon的告警策略有两个地方可以配置,一个是tpl,一个是expression,judge维护了一全局FilterMap,这个FilterMap就是tpl和expression两个地方告警策略的一个合集。
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
remain := g.Config().Remain
// 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销
now := time.Now().Unix()
for _, item := range items {
exists := g.FilterMap.Exists(item.Metric)
if !exists {
continue
}
pk := item.PrimaryKey()
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
}
return nil
}
judge的remain参数
上文说到judge在缓存数据时候会根据endpoint,metric,tag生成一个md5值,以这个md5值为primary key来缓存数据,value是个链表。也就是说同一台机器的同一个mettric会一直保存在同一个链表上,那么这个链表的长度应该保存多长呢?总不能无限增长吧。 judge配置文件中的remain参数就规定了这个链表的最大长度。往链表上添加数据的时候如果已经超过最大长度就会删除旧数据。
缓存告警策略
falcon的告警策略配置在两个地方:一个是从主机维度的tpl,另一个是tag维度的expression,juge组通过rpc调用向hbs同步这两种告警策略分别缓存起来。 syncFilter是一个map,保存了所有报警策略的metric,当transfer上报metric的时候对于没有配置报警策略的metric直接跳过,起到过滤的作用。
func SyncStrategies() {
duration := time.Duration(g.Config().Hbs.Interval) * time.Second
for {
syncStrategies()
syncExpression()
syncFilter()
time.Sleep(duration)
}
}
用来保存两种告警策略的数据结构
type SafeStrategyMap struct {
sync.RWMutex
// endpoint:metric => [strategy1, strategy2 ...]
M map[string][]model.Strategy
}
type SafeExpressionMap struct {
sync.RWMutex
// metric:tag1 => [exp1, exp2 ...]
// metric:tag2 => [exp1, exp2 ...]
M map[string][]*model.Expression
}
打印这两个map, 可以看到这两个map保存的时候都是以meric的name为key的
开始打印tpl告警策略...
ubuntu/mem.memused.percent
{Id:1 Metric:mem.memused.percent Tags:map[] Func:all(#3) Operator:>= RightValue:80 MaxStep:3 Priority:0 Note: Tpl:<Id:1, Name:falcon.tpl, ParentId:0, ActionId:1, Creator:root>}
开始打印expression告警策略...
df.bytes.free.percent/os=ubuntu
<Id:1, Metric:df.bytes.free.percent, Tags:map[os:ubuntu], all(#3)>=70 MaxStep:3, P0 ActionId:2>
判断是否触发告警
当judge接收到transfer发送的数据之后就会立即进行判断,而不是把数据缓存起来之后启动一个goroutine来定时判定缓存的数据。 judge在将mtric数据往链表中添加的时候就会进行报警判定。
func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
if linkedList, exists := this.Get(key); exists {
needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
if needJudge {
Judge(linkedList, val, now)
}
} else {
NL := list.New()
NL.PushFront(val)
safeList := &SafeLinkedList{L: NL}
this.Set(key, safeList)
Judge(safeList, val, now)
}
}
从strategy和expression两种策略进行判定
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
CheckStrategy(L, firstItem, now)
CheckExpression(L, firstItem, now)
}
两种报警判定的方式类似:
- judge有已经把报警策略缓存起来的全局变量
- 根据metric的name从全局变量中取出报警策略
- 因为有些监控需要连续几次超过阈值才会触发一次报警,所以会把保存该监控数据的链表也传递给判定方法,这也是为什么要用链表保存历史监控数据的原因
- 根据报警策略中的all(#3)>=80解析成一个判断func对监控数据进行判断
- 如果触发报警就将一个报警事件保存在redis中
将告警事件保存到redis中
judge是在rpc接收到监控数据的时候就会立马judge,如果判断出监控数据处理了告警也会立马生成一个告警事件,并将该告警事件保存到redis中。
event := &model.Event{
Id: fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
Strategy: &strategy,
Endpoint: firstItem.Endpoint,
LeftValue: leftValue,
EventTime: firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
在设置报警策略的时候可以设置报警的等级(p0,p1,p2...),judge组件的配置文件配置了redis队列名字, 所以不同的报警等级可以放到不同的redis队列中。
"queuePattern": "event:p%v",
最后捋一捋
judge的作用就是拿着监控数据根据报警规则进行比对来判断是否触发报警,因为有些报警规则需要连续多次满足条件才会触发一下报警,所以需要对历史报警数据进行保存。
- 历史监控数据 map[索引][pk]链表,针对endpoint和metric的Name生成pk,保证同一台主机的监控数据落在同一个链表上
- 告警策略 map[pk][]judgeItem, map的key类似于ubuntu/mem.memused.percent,包含了endpoint和metric和上面的历史监控数据类似
综上,judge接收到一个监控数据就可以拿到某个endpoint和metric对应的历史数据和告警策略,然后就可以进行judge了