searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Istio的防抖动处理浅析

2023-06-07 02:46:19
25
0

一、背景

在 Istio 中,Debounce(防抖动)是一种机制,防止在短时间内连续触发事件时造成过多的操作或处理开销。这种机制有助于减少系统的负载,提高性能和效率。

Debounce 机制的基本思想是在一定的时间窗口内,对连续的操作进行延迟处理,只执行最后一次触发事件的操作,忽略在该时间窗口内的其他触发事件。这样做可以确保只有在一段时间内没有更多的操作发生时,才执行相关的操作,从而减少不必要的计算、请求或处理,并降低资源消耗。

在 Istio 中,Debounce 机制主要用于以下方面:

配置更新:当对 Istio 配置进行更改时,例如更新 VirtualService、DestinationRule 等配置对象时,Debounce 机制可以防止频繁的配置变更操作。它会在一定的时间窗口内收集配置更改请求,并在窗口结束后应用最后一次更改。

服务发现:Istio 使用服务发现来管理和监控服务的信息。Debounce 机制可以防止频繁的服务注册和注销操作,将这些操作限制在一定时间窗口内的最后一次。

通过使用 Debounce 机制,Istio 可以减少不必要的配置变更和服务发现操作,提高系统的稳定性和性能。这对于大规模部署和高负载环境中的 Istio 是尤为重要。

二、原理

2.1 什么是防抖

举一个简单的例子,我们每天上下班都要坐电梯,一旦碰上早晚高峰期,如何更有效的把人送到指定楼层就非常关键,因此电梯系统跟配置推送一样都注重高效推送。拿电梯来说,最简单的设计,电梯每上来一个人,就立马送达,但是,当你第一个进电梯后,想上到7楼,按下了7楼并关闭了电梯门,这时门还没关上的时候,突然另一个同事又跑了过来,电梯门又打开,同事进来后你们一起去了7楼。这样两个人两次去7楼的事件,电梯跑了一趟就搞定了。这是很容易想到的单次处理改为批量处理。

但这样也会有个问题,例如到了下班时间,大家都想准点下班,每当你要关闭电梯门的时候,总有个同事要进来,如果电梯容量无限大的话,门永远关不上,电梯也一直不走,那好了大家今天谁也别想下班,都留下来加班吧。也是个很大的问题。那我们做一个规定,每趟电梯在7楼的时候最多等待 2分钟,到时间了不管有没有人要进来电梯都要走,这样电梯的利用率就提升了,大家也不用等太久就可以下班回家了。

如果优化的更好一点,把所有电梯分成奇偶两组,奇组只在奇数层停,偶组只在偶数层停。这样就可以最大化的提升资源利用率。但还有一种情况,如果我们进电梯后,后面没有人进电梯了,白白等待了 2 分钟电梯才走,浪费了时间,这也不行。

那我们就再给电梯系统加一个时间,让电梯在有人进电梯后等待 10 秒,如果过了 10 秒还没有下一个人进来,电梯就不等了。 如有有人进来就重新计时 10 秒钟。

从上面这个例子可以引申出几个概念,一个是最小静默时间,一个是最大延迟时间。最小静默时间就是上面的 10 秒钟,从上一个进电梯的人开始计时,10 秒内有新的人进来就接着等,否则就不等,每进一个人就重新更新这个时间。最大延迟时间就是上面电梯等待的 2 分钟,到了这个时间就算还有很多人没有进电梯,电梯也必须走。另外一个防抖中的重要概念就是分组合并,比如把都去偶数层的人统一在一趟电梯上。

2.2 如何设计一个防抖

本文以Go语言为基础,来思考如何实现一个防抖程序。

已知的组件有

  • 输入的触发请求事件,这里用PushRequest的chan表示;
  • 因为是定时任务拉取请求处理,所以会用到for go select的定时任务模式
  • 输出是将抖动处理完成的请求推送处理,因此定义一个pushFn的推送方法
  • 防抖动的核心是最小静默时间、最大延迟时间、合并处理三个概念

基础代码可以这样

func debounceBasic(ch chan *model.PushRequest, stopCh <-chan struct{},opts debounceOptions, pushFn func(req *model.PushRequest)){
	//延迟时间
	var timeChan <-chan time.Time
	//当前时间
	var lastConfigUpdateTime time.Time
	for {
		select {
		case r := <-ch: //请求事件触发
			//处理请求事件
			//合并处理
		case <-timeChan: //延迟时间到了
			// ?
		case <-stopCh: //退出
			return
		}
	}
}

一个for循环的定时结构,肯定要有stopCh的退出结构,还要有请求事件触发之后,执行处理推送事件,如果有需要还需合并请求处理,也不能无限自旋处理,占着CPU资源,因此当一个请求事件处理完了之后,等待这个过程需要加上延迟处理timeChan,延迟时间肯定跟当前时间和最小静默时间有关系。

当一个请求事件到达之后,记录下当前时间和事件次数,需要判断是否是第一次请求,如果是第一次请求,那说明上次已经推送了事件,需要重置延迟时间为最小静默时间,最后合并请求事件;

当延迟时间到了,就应该要推送事件,但是我们要判断一下当前是否在推送,如果当前没有推送事件,才触发推送;

防抖动的核心方法是什么时候决定推送,这取决于当前这个请求的延迟时间和最小静默时间以及最大延迟时间的关系,如果延迟时间大于等于最大延迟时间(电梯等了2min)或者静默时间大于等于最小静默时间(电梯在上一个人进入后等了10s还没人进来),这时就要推送,这个时候也要记录状态,重置标志位,比如可以用累积事件次数作为标志位,此时累积事件数置为0,表示已经完成一波推送,下次重新开始;如果延迟时间小于最大延迟时间而且(电梯还没等够2min)而且静默时间小于最小静默时间(电梯在上一个人进入后在10s内又有人进来),这时做什么呢,这个时候只需要更新一下延迟时间,这个时间怎么定,以电梯为例,假如最后一个人8:50进入电梯,我判断要不要关门这个时间是8:53;这个时间差是3s,而最小静默时间是10s,也就说在后面7s我可以是没事干的,那可以玩手机等待,不用向电梯外张望还有没有人,此时延迟时间就是最小静默时间-已经静默的时间;

基础代码现在变成这样:

func debounceBasic(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest)) {
	//延迟时间
	var timeChan <-chan time.Time
	// 重置推送后的开始时间,用来做延迟时间判断
	var startDebounce time.Time
	//每个事件触发的时间,用来做静默时间判断
	var lastConfigUpdateTime time.Time
	//事件累积数量
	debouncedEvents := 0
	//是否正在推送标志位
	free := true

	pushWorker := func() {
		//距离第一个事件进来过了多久
		eventDelay := time.Since(startDebounce)
		//距离最后一个事件进来过了多久
		quietTime := time.Since(lastConfigUpdateTime)
		//延迟时间大于等于最大延迟时间或者静默时间大于等于最小静默时间
		if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
			//更新推送标识,当前处理中
			free = false
			//推送处理方法
			//重置事件累积数量为0,重置推送状态
			debouncedEvents = 0
		} else {
			//更新延迟时间=最小静默时间-已经静默的时间
			timeChan = time.After(opts.debounceAfter - quietTime)
		}
	}

	for {
		select {
		case r := <-ch: //请求事件触发
			//初始化当前事件的时间
			lastConfigUpdateTime = time.Now()
			//以事件累积数为标识,为0表示上一波已推送,重新开始
			if debouncedEvents == 0 {
				//更新延迟时间为最小静默时间
				timeChan = time.After(opts.debounceAfter)
			}
			//更新事件累积数
			debouncedEvents++
			//合并处理
			Merge(r)
		case <-timeChan: //延迟时间到了
			if free {
				//没有正在进行的推送,执行事件推送
				pushWorker()
			}
		case <-stopCh: //退出
			return
		}
	}

}

这个时候我们只需关注的是真正的推送处理逻辑,除了一些监控逻辑,与当前处理流程相关的是,推送完成之后,需要通知到当前方法,继续推送,更新推送标志位为非推送中;一个完整的核心代码如下,去掉了源码的一些监控指标数据和非核心流程:

func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest)) {
	var timeChan <-chan time.Time
	var startDebounce time.Time
	var lastConfigUpdateTime time.Time

	debouncedEvents := 0

	// Keeps track of the push requests. If updates are debounce they will be merged.
	var req *model.PushRequest

	free := true
	freeCh := make(chan struct{}, 1)

	push := func(req *model.PushRequest) {
		pushFn(req)
		freeCh <- struct{}{}
	}

	pushWorker := func() {
		eventDelay := time.Since(startDebounce)
		quietTime := time.Since(lastConfigUpdateTime)
		if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
			if req != nil {
				free = false
				go push(req, debouncedEvents, startDebounce)
				req = nil
				debouncedEvents = 0
			}
		} else {
			timeChan = time.After(opts.debounceAfter - quietTime)
		}
	}

	for {
		select {
		case <-freeCh:
			free = true
			pushWorker()
		case r := <-ch:
			lastConfigUpdateTime = time.Now()
			if debouncedEvents == 0 {
				timeChan = time.After(opts.debounceAfter)
				startDebounce = lastConfigUpdateTime
			}
			debouncedEvents++
			req = req.Merge(r)
		case <-timeChan:
			if free {
				pushWorker()
			}
		case <-stopCh:
			return
		}
	}
}

 

 

0条评论
0 / 1000