理解作用
EnqueueExtensions
是一个可选的接口,如果一个插件对象实现了此接口,可以帮助pod进行在调度队列中的快速移动
意思就是,一个插件可以注册一些事件,代表它关心这些资源发生的某些事件,如果这些事件发生了,则可以使得 pod schedulable
// EnqueueExtensions is an optional interface that plugins can implement to efficiently
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
// EventsToRegister returns a series of possible events that may cause a Pod
// failed by this plugin schedulable.
// The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters);
// otherwise it would lead to undefined behavior.
EventsToRegister() []ClusterEvent
}
// pkg/noderesourcetopology/plugin.go
func (tm *TopologyMatch) EventsToRegister() []framework.ClusterEvent { //如果in-place-update实现了,那么pod update也要关心
// To register a custom event, follow the naming convention at:
// https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410
nrtGVK := fmt.Sprintf("noderesourcetopologies.v1alpha2.%v", topologyapi.GroupName)
return []framework.ClusterEvent{ //代表这个插件关心着3个对象的某些事件
{Resource: framework.Pod, ActionType: framework.Delete},
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeAllocatable},
{Resource: framework.GVK(nrtGVK), ActionType: framework.Add | framework.Update},
}
}
源码分析
基于1.25版本进行代码分析
EventsToRegister 这个函数的唯一被调用位置位于
// vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go
var allClusterEvents = []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.All},
{Resource: framework.CSINode, ActionType: framework.All},
{Resource: framework.PersistentVolume, ActionType: framework.All},
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All},
{Resource: framework.StorageClass, ActionType: framework.All},
}
func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.String) {
ext, ok := p.(framework.EnqueueExtensions)
if !ok {
// If interface EnqueueExtensions is not implemented, register the default events
// to the plugin. This is to ensure backward compatibility.
registerClusterEvents(p.Name(), eventToPlugins, allClusterEvents) //如果当前不实现接口,则将默认的所有事件注册给他。
return
}
events := ext.EventsToRegister()
// It's rare that a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
if len(events) == 0 {
klog.InfoS("Plugin's EventsToRegister() returned nil", "plugin", p.Name())
return
}
// The most common case: a plugin implements EnqueueExtensions and returns non-nil result.
registerClusterEvents(p.Name(), eventToPlugins, events)
//将所有的plugin,如果实现了EnqueueExtensions接口,则将其注册的 []ClusterEvent拿出来,放到eventToPlugins这个map中
}
func registerClusterEvents(name string, eventToPlugins map[framework.ClusterEvent]sets.String, evts []framework.ClusterEvent) {
for _, evt := range evts {
if eventToPlugins[evt] == nil {
eventToPlugins[evt] = sets.NewString(name)
} else {
eventToPlugins[evt].Insert(name)
}
}
}
// eventToPlugins 是一个map,key是framework.ClusterEvent这个结构体的值,而value则是一个 plugin names的set,代表跟这个事件有关的所有插件
而fillEventToPluginMap 其实是在构造Framework对象的时候调用的
// vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go
func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-chan struct{}, opts ...Option) (framework.Framework, error) {
options := defaultFrameworkOptions(stopCh)
for _, opt := range opts {
opt(&options) //其实在这里,通过options模式,将外部的一个map,放入了options.clusterEventMap
}
...
pluginsMap := make(map[string]framework.Plugin)
for name, factory := range r {
// initialize only needed plugins.
if !pg.Has(name) {
continue
}
args := pluginConfig[name]
if args != nil {
outputProfile.PluginConfig = append(outputProfile.PluginConfig, config.PluginConfig{
Name: name,
Args: args,
})
}
p, err := factory(args, f)
if err != nil {
return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
}
pluginsMap[name] = p
// Update ClusterEventMap in place.
fillEventToPluginMap(p, options.clusterEventMap)
}
}
//继续向上看,入口在 vendor/k8s.io/kubernetes/pkg/scheduler/scheduler.go
// New returns a Scheduler
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
...
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap), //在这里传入了一个map,通过一步步调用,最终在NewFramework中将所有的注册的事件注入了clusterEventMap
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
)
...
//最终clusterEventMap在下面使用了
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
}
//将所有的ClusterEvent 拿出来展开并合并,以gvk为key,将ActionType进行合并
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for evt := range m {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType //将不同的ActionType 进行位或运算,代表这个资源关心的所有的事件
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
return gvkMap
}
//将不同的插件注册的clusterEvent展平了之后,变成了这个map,也就是对每一个插件对具体gvk的actionType,也就是关心的变化都放到一起
//那么这个gvk发生了任意一类事件,都要移动队列
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType,
) {
。。。
//以上都是原来的逻辑,就是对默认的node,pod进行informer的 eventhandler注册
//下面就是对各种资源的事件进行一个统一处理,构造一个工厂函数,用来生成对应的cache.ResourceEventHandlerFuncs
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
funcs := cache.ResourceEventHandlerFuncs{}
if at&framework.Add != 0 { //判断是否是一个add
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
return funcs
}
for gvk, at := range gvkMap {
switch gvk {
case framework.Node, framework.Pod: //pod和node在函数开始的时候已经处理了
// Do nothing.
case framework.CSINode:
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSINode, "CSINode"),
)
case framework.CSIDriver:
informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
)
case framework.CSIStorageCapacity:
informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
)
case framework.PersistentVolume:
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
//
// PvAdd: Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
//
// PvUpdate: Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
)
case framework.PersistentVolumeClaim:
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
case framework.StorageClass:
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
}
if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassUpdate, nil)
},
},
)
}
default:
// Tests may not instantiate dynInformerFactory.
if dynInformerFactory == nil {
continue
}
// GVK is expected to be at least 3-folded, separated by dots.
// <kind in plural>.<version>.<group>
// Valid examples:
// - foos.v1.example.com
// - bars.v1beta1.a.b.c
// Invalid examples:
// - foos.v1 (2 sections)
// - foo.v1.example.com (the first section should be plural)
if strings.Count(string(gvk), ".") < 2 {
klog.ErrorS(nil, "incorrect event registration", "gvk", gvk)
continue
}
// Fall back to try dynamic informers.
gvr, _ := schema.ParseResourceArg(string(gvk)) //通过gvk解析gvr
dynInformer := dynInformerFactory.ForResource(*gvr).Informer() //通过动态client获取gvr informer
dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
) //以topology numa插件为例,noderesourcetopologies.v1alpha2.topology.node.k8s.io 这个对象的add和update也可以被识别到,并触发队列移动
}
}
}
MoveAllToActiveOrBackoffQueue 是根据ClusterEvent来决定是否要移动所有的pod去activeQ或者BackoffQueue
// MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ.
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod. 移动pod到AQ或者backoffQ,并发送条件变量让他们pop不再阻塞
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) {
p.lock.Lock()
defer p.lock.Unlock()
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap))
for _, pInfo := range p.unschedulablePods.podInfoMap { //将不可调度的pod取出来
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
}
// NOTE: this function assumes lock has been acquired in caller 调用方要获取PriorityQueue lock
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
activated := false
for _, pInfo := range podInfoList {
// If the event doesn't help making the Pod schedulable, continue. 其实就是这里判断,这个event是否对pod重新调度有帮助
// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
// In that case, it's desired to move it anyways.
if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
continue
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else {
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pod)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
} else {
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod)
}
}
}
p.moveRequestCycle = p.schedulingCycle
if activated {
p.cond.Broadcast()
}
}
// Checks if the Pod may become schedulable upon the event.
// This is achieved by looking up the global clusterEventMap registry.
func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool {
if clusterEvent.IsWildCard() {
return true
}
for evt, nameSet := range p.clusterEventMap { //priorityQueue中也保存了clusterEventMap这个全局对象,是在构造调度器的时候初始化完成的
// Firstly verify if the two ClusterEvents match:
// - either the registered event from plugin side is a WildCardEvent,
// - or the two events have identical Resource fields and *compatible* ActionType.
// Note the ActionTypes don't need to be *identical*. We check if the ANDed value
// is zero or not. In this way, it's easy to tell Update&Delete is not compatible,
// but Update&All is.
evtMatch := evt.IsWildCard() || //如果资源是通配符*,或者当前这个资源的名字与当前传入的event资源一致,并且关心的动作有交集,代表这个事件是match的
//也就代表传入的这个事件,是满足当前evt这个事件的一部分
(evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0)
// Secondly verify the plugin name matches.
// Note that if it doesn't match, we shouldn't continue to search.
if evtMatch && intersect(nameSet, podInfo.UnschedulablePlugins) { //evtMatch 代表传入的event是某些插件关心的,但是很有可能调度失败跟这个插件无关,而调度失败的插件很有可能不关心也没注册这个事件,所以这种情况下也无法触发队列移动
//所以这里面有交集的话,意味着失败的插件里,一定有插件在nameSet中,也就是关心这个evt。所以调度失败的插件也肯定会关心这个事件,所以应该触发移动
return true
}
}
return false
}