Kubernetes Scheduler 的作用是将一个 Pod 调度到一个节点上,并将 Pod 与节点进行绑定。整个调度过程整体上可以分为调度周期和绑定周期两个大的阶段:前者用于筛选出一个合适的节点作为当前 Pod 的目标节点,后者会将当前 Pod 与选出的节点进行绑定,绑定周期和调度周期是相互独立的,并且是异步的,在代码中表现为整个绑定周期的操作都是在一个单独的 goroutine 中执行的。
kubernetes Scheduler 内部有一个调度框架,这个框架中内置了一些扩展点,如下图所示,我们可以通过在这些扩展点中插入需要执行的代码来对调度过程进行扩展。
以下介绍这些扩展点的基本作用:
queueSort
:这些插件对调度队列中的悬决的 Pod 排序。 一次只能启用一个队列排序插件。
preFilter
:这些插件用于在过滤之前预处理或检查 Pod 或集群的信息。 它们可以将 Pod 标记为不可调度。
filter
:这些插件相当于调度策略中的断言(Predicates),用于过滤不能运行 Pod 的节点。 过滤器的调用顺序是可配置的。 如果没有一个节点通过所有过滤器的筛选,Pod 将会被标记为不可调度。
postFilter
:当无法为 Pod 找到可用节点时,按照这些插件的配置顺序调用他们。 如果任何postFilter
插件将 Pod 标记为可调度,则不会调用其余插件。
preScore
:这是一个信息扩展点,可用于预打分工作。
score
:这些插件给通过筛选阶段的节点打分。调度器会选择得分最高的节点。
reserve
:这是一个信息扩展点,当资源已经预留给 Pod 时,会通知插件。 这些插件还实现了Unreserve
接口,在Reserve
期间或之后出现故障时调用。
permit
:这些插件可以阻止或延迟 Pod 绑定。
preBind
:这些插件在 Pod 绑定节点之前执行。
bind
:这个插件将 Pod 与节点绑定。bind
插件是按顺序调用的,只要有一个插件完成了绑定,其余插件都会跳过。bind
插件至少需要一个。
postBind
:这是一个信息扩展点,在 Pod 绑定了节点之后调用。
这些扩展点都是通过插件实现,意味着用户可以针对不同的扩展点编写插件,从而制定自己的调度规则。
接下来,结合代码介绍kubernetes scheduler对每个pod选择节点的过程:
调度框架运行后,会执行一个名为 scheduleOne()
的函数。
func (sched *Scheduler) scheduleOne(ctx context.Context) {
podInfo := sched.NextPod()
...
pod := podInfo.Pod
prof, err := sched.profileForPod(pod)
...
}
这个函数包含了整个调度过程,其中的每一个步骤都与上图中的各个节点严格对应。
首先,从队列中取出一个 Pod,然后根据 Pod 的 pod.Spec.SchedulerName
字段找到对应里的 Profile
对象,其中包含有当前调度器对应的调度框架对象。后续的调度过程中主要依赖这个调度框架对象对 Pod 进行调度。
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
...
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
然后开始调度过程,先运行 sched.Algorithm.Schedule()
,尝试为当前 Pod 找出一个合适的 Node。这个过程包含了图中的 PreFilter、Filter、PreScore、Score 和 Normalize Score 等步骤。
if err != nil {
...
if fitError, ok := err.(*core.FitError); ok {
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
...
}
...
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
return
}
如果找不到合适的 Node,则尝试进行抢占操作。抢占操作对应于调度流程中的 PostFilter 扩展点。
...
// Run "reserve" plugins.
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
接下来执行 Reserve 扩展点,Reserver 扩展点的作用是做一些相关附属资源的预留,例如 PVC。
...
// Run "permit" plugins.
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// One of the plugins returned status different than success or wait.
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
return
}
然后执行 Permit 扩展点。Permit 扩展点允许在执行真正的调度操作之前对 Pod 绑定操作进行最后的批准、拒绝或者执行延时调度。
以上部分对应于上面的调度周期,接下来会在一个单独的 goroutine 中执行绑定周期。
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
return
}
绑定操作中的第一步是 WaitOnPermit 扩展点,它主要结合 Permit 扩展点一起完成延时调度的功能。
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
return
}
接着执行 PreBind 扩展点,这个扩展点的功能主要是做一些 Pod 与节点绑定之前的准备工作,或者让其它相关资源的绑定,例如 PVC 和 PV。
//Run
"prebind"plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if
!preBindStatus.IsSuccess(){ var reason string metrics.PodScheduleErrors.Inc() reason = SchedulerError if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return }
``` goerr :=
sched.bind(bindingCycleCtx,fwk, assumedPod, scheduleResult.SuggestedHost, state) if err != nil
{ metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// trigger un-reserve
pluginsto clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) }
sched.recordSchedulingFailure(fwk,assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "") } else { // Calculating
nodeResourceStringcan be heavy. Avoid it if klog verbosity is below 2. if klog.V(2).Enabled() {
klog.InfoS("Successfullybound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) }
metrics.PodScheduled(fwk.ProfileName(),metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins.fwk.RunPostBindPlugins(
bindingCycleCtx, state,
assumedPod,scheduleResult.SuggestedHost)
}}() }
然后执行 Bind 扩展点,Pod 会在这个扩展点与节点之间真正产生绑定关系。
如果 Bind 扩展点执行成功,则接着会执行 PostBind 扩展点;如果执行失败,则说明绑定错误,会直接返回。
至此,整个 绑定周期
结束。Pod 与某个节点完成绑定,Pod 所使用的 PVC 也与对应的 PV 完成绑定。