Job控制器是极为重要的一个控制器
type jobcontroller struct {
kubeClient kubernetes.Interface
vcClient vcclientset.Interface
jobInformer batchinformer.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer schedulinginformers.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer businformer.CommandInformer
pcInformer kubeschedulinginformers.PriorityClassInformer
queueInformer schedulinginformers.QueueInformer
// A store of jobs
jobLister batchlister.JobLister
jobSynced func() bool
// A store of pods
podLister corelisters.PodLister
podSynced func() bool
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced func() bool
// A store of podgroups
pgLister schedulinglisters.PodGroupLister
pgSynced func() bool
// A store of service
svcLister corelisters.ServiceLister
svcSynced func() bool
cmdLister buslister.CommandLister
cmdSynced func() bool
pcLister kubeschedulinglisters.PriorityClassLister
pcSynced func() bool
queueLister schedulinglisters.QueueLister
queueSynced func() bool
// queue that need to sync up
queueList []workqueue.RateLimitingInterface // queueList是所有watch到的对象,注意这里之所以是slice是为了多worker协同,每个worker一个queue
// controller根据job的namecepace-name来进行hash后随机分配到各个queue中
commandQueue workqueue.RateLimitingInterface
cache jobcache.Cache
// Job Event recorder
recorder record.EventRecorder
errTasks workqueue.RateLimitingInterface
workers uint32
maxRequeueNum int
}
// 内部包含了多种对象的informer和lister
queueList
的本质是一个队列,队列的元素是自定义的一个Request对象,可以看到Request中主要包含的是跟Job相关的key信息,这也符合一般的队列模型,queue中存放key,cache中存放实际的数据:
// pkg/controllers/apis/request.go
// Request struct.
type Request struct {
Namespace string // job的namespace
JobName string // job的name
TaskName string // task的name
QueueName string // 分配到的Queue的name
Event v1alpha1.Event
ExitCode int32
Action v1alpha1.Action
JobVersion int32
}
cache
的本质是一个Job
资源的map,key是namespace/name
:
有了Indexer为什么还用cache?因为cache聚合了job和其拥有的pod,所以用cache获取job以及所拥有的pod更方便?
type jobCache struct {
sync.Mutex
jobs map[string]*apis.JobInfo
deletedJobs workqueue.RateLimitingInterface
}
//JobInfo struct. value中既包含了Job的信息,也包含了这个job对应的Pods的信息
type JobInfo struct {
Namespace string
Name string
Job *batch.Job
Pods map[string]map[string]*v1.Pod
}
jobcontroller 的初始化函数
func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.kubeClient = opt.KubeClient
cc.vcClient = opt.VolcanoClient
sharedInformers := opt.SharedInformerFactory
workers := opt.WorkerNum
// Initialize event client
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
cc.queueList = make([]workqueue.RateLimitingInterface, workers)
cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
cc.recorder = recorder
cc.workers = workers
cc.maxRequeueNum = opt.MaxRequeueNum
if cc.maxRequeueNum < 0 {
cc.maxRequeueNum = -1
}
var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
} //创建workers数量个queue用于存放job
cc.jobInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Batch().V1alpha1().Jobs()
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addJob,
UpdateFunc: cc.updateJob,
DeleteFunc: cc.deleteJob,
})
cc.jobLister = cc.jobInformer.Lister()
cc.jobSynced = cc.jobInformer.Informer().HasSynced
cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *busv1alpha1.Command:
if v.TargetObject != nil &&
v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
v.TargetObject.Kind == "Job" {
return true
}
return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
},
)
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
})
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced
cc.svcInformer = sharedInformers.Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced
cc.pgInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced
cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses()
cc.pcLister = cc.pcInformer.Lister()
cc.pcSynced = cc.pcInformer.Informer().HasSynced
cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues()
cc.queueLister = cc.queueInformer.Lister()
cc.queueSynced = cc.queueInformer.Informer().HasSynced
// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
return nil
}
// pkg/controllers/job/state/factory.go
var (
// SyncJob will create or delete Pods according to Job's spec.
SyncJob ActionFn
// KillJob kill all Pods of Job with phase not in podRetainPhase.
KillJob KillActionFn
)
//State interface.
type State interface {
// Execute executes the actions based on current state.
Execute(act v1alpha1.Action) error
}
// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
job := jobInfo.Job
switch job.Status.State.Phase {
case vcbatch.Pending:
return &pendingState{job: jobInfo}
case vcbatch.Running:
return &runningState{job: jobInfo}
case vcbatch.Restarting:
return &restartingState{job: jobInfo}
case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
return &finishedState{job: jobInfo}
case vcbatch.Terminating:
return &terminatingState{job: jobInfo}
case vcbatch.Aborting:
return &abortingState{job: jobInfo}
case vcbatch.Aborted:
return &abortedState{job: jobInfo}
case vcbatch.Completing:
return &completingState{job: jobInfo}
}
// It's pending by default.
return &pendingState{job: jobInfo}
}
syncJob和killJob是2个action,其实是定义了一个Job在不同的state的时候,如果触发了某个action,该如何动作 state 其实就是job.status.state, 总共有pending,aborting,running,completing等 action 其实就是job spec定义的policy,当一些事件发生后要采取的动作。 所以在不同的状态下,对应不同的action 也要采取不同的动作,而动作只有2个,就是syncJob和killJob
// job informer的触发eventhandler
func (cc *jobcontroller) addJob(obj interface{}) {
job, ok := obj.(*batch.Job)
if !ok {
klog.Errorf("obj is not Job")
return
}
req := apis.Request{
Namespace: job.Namespace,
JobName: job.Name,
Event: bus.OutOfSyncEvent, //job的增删改,都触发这个事件,意思是job没有同步
}
// TODO(k82cn): if failed to add job, the cache should be refresh
if err := cc.cache.Add(job); err != nil { //job进入缓存,如果放入缓存报错?
klog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key) //计算当前job要塞入哪个queue
queue.Add(req)
}
func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
newJob, ok := newObj.(*batch.Job)
if !ok {
klog.Errorf("newObj is not Job")
return
}
oldJob, ok := oldObj.(*batch.Job)
if !ok {
klog.Errorf("oldJob is not Job")
return
}
// No need to update if ResourceVersion is not changed
if newJob.ResourceVersion == oldJob.ResourceVersion {
klog.V(6).Infof("No need to update because job is not modified.")
return
}
if err := cc.cache.Update(newJob); err != nil {
klog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}
// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
// 只根据spec进行调谐,而status只会被当前控制器更新,所以如果status更新了一定是当前控制器搞的。
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.")
return
}
req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Event: bus.OutOfSyncEvent,
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
}
func (cc *jobcontroller) deleteJob(obj interface{}) {
job, ok := obj.(*batch.Job)
if !ok {
// If we reached here it means the Job was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) //relist导致的删除
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
job, ok = tombstone.Obj.(*batch.Job)
if !ok {
klog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj)
return
}
}
if err := cc.cache.Delete(job); err != nil {
klog.Errorf("Failed to delete job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
}
// 根据job的key来判断要进入哪个queue
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
var hashVal hash.Hash32
var val uint32
hashVal = fnv.New32()
hashVal.Write([]byte(key))
val = hashVal.Sum32()
queue := cc.queueList[val%cc.workers]
return queue
}
后续的一些逻辑判断,经常会让人看不懂,一个潜在的逻辑就是,req中的Action和Event,是有可能和cache中Job的状态是不一致的,cache中是最新的,而req可能滞后 所以,例如通过req来判断要执行sync的话(旧的事件),而job可能已经被删了,所以此时应该停止sync
// cache 对于job的缓存, 这里没有操作pod, pod在被创建出来后才会进入cache,所以这里初始化的pod集合都是空map
func (jc *jobCache) Add(job *v1alpha1.Job) error {
jc.Lock()
defer jc.Unlock()
key := JobKey(job) //就是ns/name 来确定key
if jobInfo, found := jc.jobs[key]; found {
if jobInfo.Job == nil {
jobInfo.SetJob(job)
return nil
}
return fmt.Errorf("duplicated jobInfo <%v>", key)
}
jc.jobs[key] = &apis.JobInfo{
Name: job.Name,
Namespace: job.Namespace,
Job: job,
Pods: make(map[string]map[string]*v1.Pod), //第一个key是task名字,第二个key是pod UID
}
return nil
}
func (jc *jobCache) Update(obj *v1alpha1.Job) error {
jc.Lock()
defer jc.Unlock()
key := JobKey(obj)
job, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
}
var oldResourceversion, newResourceversion uint64
var err error
if oldResourceversion, err = strconv.ParseUint(job.Job.ResourceVersion, 10, 64); err != nil {
return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, job.Job.ResourceVersion)
}
if newResourceversion, err = strconv.ParseUint(obj.ResourceVersion, 10, 64); err != nil {
return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, obj.ResourceVersion)
}
if newResourceversion < oldResourceversion {
return fmt.Errorf("job <%v> has too old resource version: %d (%d)", key, newResourceversion, oldResourceversion)
}
job.Job = obj
return nil
}
func (jc *jobCache) Delete(obj *v1alpha1.Job) error {
jc.Lock()
defer jc.Unlock()
key := JobKey(obj)
jobInfo, found := jc.jobs[key]
if !found {
return fmt.Errorf("failed to find job <%v>", key)
}
jobInfo.Job = nil
jc.deleteJob(jobInfo)
return nil
}
知道了controller中的关键的数据结构,我们也就能猜测controller的reconcile的逻辑了:生产者通过list-watch将Job的key信息加入到queueList中,将Job的实体信息保存到jobCache中缓存,消费者从queueList中获取数据并进行处理。其主要代码在processNextReq函数中:
// Run start JobController.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.pcInformer.Informer().Run(stopCh)
go cc.queueInformer.Informer().Run(stopCh)
cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)
go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num) //根据worker的编号,进行处理对应的queue
},
time.Second,
stopCh)
}(i)
}
go cc.cache.Run(stopCh)
// Re-sync error tasks.
go wait.Until(cc.processResyncTask, 0, stopCh) //定期resync err tasks
klog.Infof("JobController is running ...... ")
}
可以看到这里分别有4类协程
- cc.worker(num)
- cc.cache.Run(stopCh)
- cc.processResyncTask
- cc.handleCommands
// jobController.worker(int num)
func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)
for cc.processNextReq(i) {
}
}
unc (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count] //取出当前worker 负责的queue
obj, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
req := obj.(apis.Request)
defer queue.Done(req)
key := jobcache.JobKeyByReq(&req) //如果这个Job不属于当前这个 id 的worker 则报错,重新入队
if !cc.belongsToThisRoutine(key, count) {
klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
klog.V(3).Infof("Try to handle request <%v>", req)
jobInfo, err := cc.cache.Get(key)
if err != nil {
// TODO(k82cn): ignore not-ready error.
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return true
}
st := state.NewState(jobInfo) //构造一个state对象,根据job.Status.State.Phase来确定state的类型
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return true
}
action := applyPolicies(jobInfo.Job, &req) //根据job的policy等信息获取action
klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)
if action != busv1alpha1.SyncJobAction {
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Start to execute action %s ", action))
} //如果不是syncJobAction,则代表是一些特殊的action,则记录一下事件
if err := st.Execute(action); err != nil { //执行对应state的逻辑,里面主要是KillJob或者SyncJob
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
queue.AddRateLimited(req)
return true
}
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
// If no error, forget it.
queue.Forget(req)
return true
}
func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action {
if len(req.Action) != 0 {
return req.Action
}
if req.Event == v1alpha1.OutOfSyncEvent {
return v1alpha1.SyncJobAction
}
// For all the requests triggered from discarded job resources will perform sync action instead
// 因为cache可能是新的,req从queue里来,可能是旧的数据,所以以最新的cache job数据为准
if req.JobVersion < job.Status.Version {
klog.Infof("Request %s is outdated, will perform sync instead.", req)
return v1alpha1.SyncJobAction
}
// Overwrite Job level policies, 如果是pod触发了task级别的事件,则使用task policy action
if len(req.TaskName) != 0 { //如果是一个task 触发的req,则使用task的policy, 当pod触发事件的时候,req里面保存了TaskName,其实就代表是task级别要处理的一些action
// Parse task level policies
for _, task := range job.Spec.Tasks {
if task.Name == req.TaskName {
for _, policy := range task.Policies {
policyEvents := getEventlist(policy) //获取这个Policy定义的event列表
if len(policyEvents) > 0 && len(req.Event) > 0 { //如果当前触发的req里面的event在这个policy要处理的events中,那么代表这个req要触发对应的event的action
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
return policy.Action
}
}
// 0 is not an error code, is prevented in validation admission controller
// 不支持定义 退出码为0 作为作业策略的退出码依据,会被addmission控制,也就是如果req触发的退出码和当前policy希望触发的退出码相同,那也可以触发action
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}
break
}
}
}
// Parse Job level policies,如果是job级别的事件
for _, policy := range job.Spec.Policies {
policyEvents := getEventlist(policy)
if len(policyEvents) > 0 && len(req.Event) > 0 {
if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
return policy.Action
}
}
// 0 is not an error code, is prevented in validation admission controller
if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
return policy.Action
}
}
//如果啥都没有,那就是sync 动作
return v1alpha1.SyncJobAction
}
func (ps *runningState) Execute(action v1alpha1.Action) error {
switch action {
case v1alpha1.RestartJobAction:
return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Restarting
status.RetryCount++
return true
})
case v1alpha1.AbortJobAction:
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Aborting
return true
})
case v1alpha1.TerminateJobAction:
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Terminating
return true
})
case v1alpha1.CompleteJobAction:
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Completing
return true
})
default:
//传进去一个函数,其实要调谐的目标是status
return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
jobReplicas := TotalTasks(ps.job.Job) //获取pod副本总数
if jobReplicas == 0 {
// when scale down to zero, keep the current job phase
return false
}
minSuccess := ps.job.Job.Spec.MinSuccess
//达到了最小成功数
if minSuccess != nil && status.Succeeded >= *minSuccess {
status.State.Phase = vcbatch.Completed
return true
}
totalTaskMinAvailable := TotalTaskMinAvailable(ps.job.Job) //获取total 每个task的最小可用数
if status.Succeeded+status.Failed == jobReplicas { //如果这个Job已经执行完了,所以成功的+失败的等于总数pod
if ps.job.Job.Spec.MinAvailable >= totalTaskMinAvailable {
for _, task := range ps.job.Job.Spec.Tasks {
if task.MinAvailable == nil {
continue
}
if taskStatus, ok := status.TaskStatusCount[task.Name]; ok { //某一个task的pod成功数小于她自己的MinAvailable,则这个job是失败的
if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
status.State.Phase = vcbatch.Failed
return true
}
}
}
}
if minSuccess != nil && status.Succeeded < *minSuccess {
status.State.Phase = vcbatch.Failed
} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { //成功数要大于minSuccess 和 MinAvailable 才算成功
status.State.Phase = vcbatch.Completed
} else {
status.State.Phase = vcbatch.Failed
}
return true
}
return false //返回false代表不更新status
})
}
}
通过以上过程我们可以看到Job的reconcile中存在比较多的状态,因此代码中使用了Action和State两个状态来进行状态机的转移,不过最终处理的逻辑主要就是SyncJob
和KillJob
两种,因此我们主要分析这两部分的逻辑。
SyncJob
从前面状态转移的表格中,我们看到只有Pending
和Running
状态才有机会进入到SyncJob
的流程中,其实也就是从创建Job到Job正常运行的过程,因此可以预料SyncJob
主要就是将Job运行起来。主要的流程如下:
- 对于新Job先进行初始化:创建对应的PVC(volcano中的pvc需要自己管理,没有k8s的controller)和PodGroup(一个Job对应一个PodGroup),注意创建出来的PodGroup则由PodGroup controller管理且其name和Job保持一致
- 根据Job中的Task变化来生成要创建的pod list和要删除的pod list
- 分别起协程调用kube-apiserver创建和删除这两个list中的pod,需要注意的是,为了不让k8s的调度器处理这些新创建的pod,Job中需要携带调度器的信息并最终传入到pod上,这样k8s的调度器会过滤掉这些带有volcano调度器名字的pod,同样volcano的调度器则只会过滤出这些带有volcano调度器名字的pod,避免相互影响,在创建Job的时候,webhook中会默认给Job加上
volcano
这个调度器名字 - 更新Job的状态,更新jobCache缓存
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
defer klog.V(3).Infof("Finished Job <%s/%s> sync up, current version %d", job.Namespace, job.Name, job.Status.Version)
if jobInfo.Job.DeletionTimestamp != nil { //如果已经被删了,那级应该走killJob流程了
klog.Infof("Job <%s/%s> is terminating, skip management process.",
jobInfo.Job.Namespace, jobInfo.Job.Name)
return nil
}
// deep copy job to prevent mutate it
job = job.DeepCopy()
// Find queue that job belongs to, and check if the queue has forwarding metadata
queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)
if err != nil {
return err
}
var jobForwarding bool
if len(queueInfo.Spec.ExtendClusters) != 0 { //这里待查含义
jobForwarding = true
if len(job.Annotations) == 0 {
job.Annotations = make(map[string]string)
}
job.Annotations[batch.JobForwardingKey] = "true"
job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}
// Skip job initiation if job is already initiated
if !isInitiated(job) { //isInitiated(job) 判断是否已经初始化好了,也就是非pending的job 都代表已经初始化好了,这里的意思是没有初始化完成,也就是job处于pending的话,那就触发初始化,包括pg创建,插件的一些处理等
if job, err = cc.initiateJob(job); err != nil {
return err
}
} else {
// TODO: optimize this call it only when scale up/down
if err = cc.initOnJobUpdate(job); err != nil { // 如果不是pending job,则更新job
return err
}
}
if len(queueInfo.Spec.ExtendClusters) != 0 {
jobForwarding = true
job.Annotations[batch.JobForwardingKey] = "true"
_, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
}
var syncTask bool
pgName := job.Name + "-" + string(job.UID)
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true //如果pg 已经过了pending状态,比如经过了调度,那么就要synctasks,也就是对pod进行操作了
} //因为pg的更新也会触发job的调谐,比如pg调度完成了
for _, condition := range pg.Status.Conditions {
if condition.Type == scheduling.PodGroupUnschedulableType {
cc.recorder.Eventf(job, v1.EventTypeWarning, string(batch.PodGroupPending),
fmt.Sprintf("PodGroup %s:%s unschedule,reason: %s", job.Namespace, job.Name, condition.Message))
}
}
}
var jobCondition batch.JobCondition
if !syncTask { //不需要同步task的话,意味着pg没有被调度?,则只需要更新job status
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
}
}
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}
var running, pending, terminating, succeeded, failed, unknown int32
taskStatusCount := make(map[string]batch.TaskState)
podToCreate := make(map[string][]*v1.Pod)
var podToDelete []*v1.Pod
var creationErrs []error
var deletionErrs []error
appendMutex := sync.Mutex{}
appendError := func(container *[]error, err error) {
appendMutex.Lock()
defer appendMutex.Unlock()
*container = append(*container, err)
}
waitCreationGroup := sync.WaitGroup{}
for _, ts := range job.Spec.Tasks {
ts.Template.Name = ts.Name
tc := ts.Template.DeepCopy()
name := ts.Template.Name
pods, found := jobInfo.Pods[name]
if !found {
pods = map[string]*v1.Pod{}
}
var podToCreateEachTask []*v1.Pod
for i := 0; i < int(ts.Replicas); i++ {
podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i) //jobName-taskName-i的方式为pod生成名字
if pod, found := pods[podName]; !found {
newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding) //创建一个pod manifest
if err := cc.pluginOnPodCreate(job, newPod); err != nil {
return err
}
podToCreateEachTask = append(podToCreateEachTask, newPod)
waitCreationGroup.Add(1) //后面有多个协程一起创建pod
} else {
delete(pods, podName) //对scale down场景,pods中多余的哪些在后面会被删除,这里删掉的pod代表不需要被删除
if pod.DeletionTimestamp != nil { //比如从2个副本->1个,那么2号pod在后面会进入podToDelete,如此时如果1号pod被删了,那么就增加一个terminating的数值
klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
atomic.AddInt32(&terminating, 1)
continue
}
//对已有的pod进行phase判断
classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount) //然后根据pod phase的技术累积到taskStatusCount这个map
}
}
podToCreate[ts.Name] = podToCreateEachTask
for _, pod := range pods {
podToDelete = append(podToDelete, pod) //scale down剩下的pod要被删除
}
}
for taskName, podToCreateEachTask := range podToCreate {
if len(podToCreateEachTask) == 0 {
continue
}
go func(taskName string, podToCreateEachTask []*v1.Pod) { //每个task一个协程
taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
if job.Spec.Tasks[taskIndex].DependsOn != nil { //处理有task依赖的情况
cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job)
}
for _, pod := range podToCreateEachTask {
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
//然后每个pod一个协程
newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) { //创建失败,而且是已经存在的情况
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
} else {
classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
calcPodStatus(pod, taskStatusCount)
klog.V(5).Infof("Created Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
}
}(pod)
}
}(taskName, podToCreateEachTask)
}
waitCreationGroup.Wait()
if len(creationErrs) != 0 { //有任何一个pod创建失败,则退出本次调谐
cc.recorder.Event(job, v1.EventTypeWarning, FailedCreatePodReason,
fmt.Sprintf("Error creating pods: %+v", creationErrs))
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}
// Delete pods when scale down.
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.deleteJobPod(job.Name, pod) //从apiserver删除pod
if err != nil {
// Failed to delete Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
klog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
appendError(&deletionErrs, err)
cc.resyncTask(pod)
} else {
klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
pod.Name, job.Namespace, job.Name)
atomic.AddInt32(&terminating, 1)
}
}(pod)
}
waitDeletionGroup.Wait()
if len(deletionErrs) != 0 {
cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}
job.Status = batch.JobStatus{
State: job.Status.State,
Pending: pending,
Running: running,
Succeeded: succeeded,
Failed: failed,
Terminating: terminating,
Unknown: unknown,
Version: job.Status.Version,
MinAvailable: job.Spec.MinAvailable,
TaskStatusCount: taskStatusCount,
ControlledResources: job.Status.ControlledResources,
Conditions: job.Status.Conditions,
RetryCount: job.Status.RetryCount,
}
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
}
}
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}
syncJob报错,则会被
processNextReq
进行判断是否要重试,需要的话会重新入队
初始化job,通常是job刚刚创建,要赋予其pending
状态,以及其他字段的一些默认值,以及创建pg和pvc等
func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
jobInstance, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return nil, err
}
if err := cc.pluginOnJobAdd(jobInstance); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return nil, err
}
newJob, err := cc.createJobIOIfNotExist(jobInstance) //这里主要是create pvc, job.spec里面可以写一个pvc的名字,如果不写的话,默认会生成一个 pvc
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
return nil, err
}
if err := cc.createOrUpdatePodGroup(newJob); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return nil, err
}
return newJob, nil
}
func (cc *jobcontroller) initJobStatus(job *batch.Job) (*batch.Job, error) {
if job.Status.State.Phase != "" {
return job, nil
}
job.Status.State.Phase = batch.Pending
job.Status.State.LastTransitionTime = metav1.Now()
job.Status.MinAvailable = job.Spec.MinAvailable
jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) //把pending这个状态放到condition里
job.Status.Conditions = append(job.Status.Conditions, jobCondition)
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return nil, err
}
if err := cc.cache.Update(newJob); err != nil { //更新jobCache缓存
klog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, err)
return nil, err
}
return newJob, nil
}
// 如果不是pending job,则更新job, 这里通常对应的场景是,job比如running起来了,外部更新了job spec
func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error {
klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name)
if err := cc.pluginOnJobUpdate(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
}
if err := cc.createOrUpdatePodGroup(job); err != nil { //创建对应的podgroup
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
}
return nil
}
func (cc *jobcontroller) pluginOnJobUpdate(job *batch.Job) error {
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobUpdate(job); err != nil { //对应plugin在job update的时候的处理,有的啥都没做,有的会更新cm
klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err)
return err
}
}
return nil
}
//这里会调用到pluginBudiler,而目前支持的5种,都在init函数里注册了
// pkg/controllers/job/plugins/factory.go
func init() {
RegisterPluginBuilder("ssh", ssh.New)
RegisterPluginBuilder("env", env.New)
RegisterPluginBuilder("svc", svc.New)
RegisterPluginBuilder("tensorflow", tensorflow.New)
RegisterPluginBuilder("mpi", mpi.New)
}
func RegisterPluginBuilder(name string, pc PluginBuilder) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
pluginBuilders[name] = pc
}
// GetPluginBuilder returns plugin builder for a given plugin name.
func GetPluginBuilder(name string) (PluginBuilder, bool) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
pb, found := pluginBuilders[name]
return pb, found
}
其实Command这个cmd也是通过外部进行一些action的传入,来影响job,比如RestartJob 之类的,也就是用户通过cmd来影响job的行为。
jobController除了关注Job,也关注了pg的update,会将对应的job入队
也关注了cmd的create,会先入队到cmdQueue,然后再入队到jobqueue
同时也关注了pod,但是关注pod的时候会判断这个pod是否是跟job有关的,会通过一些annotation之类的,和controlledBy
来判断,然后也是封装一个req来入队到jobuQueue
其实pod会触发一些job的同步,比如job的增删改,尤其是update,会触发判断task是否完成,然后触发不同的事件,对于deletePod,则触发的是PodEvictedEvent
KillJob
从前面的表格中可以看出KillJob
主要是删除Job或者异常场景触发的,Job并不支持升级操作,只支持扩缩容,因此一旦遇到异常场景会直接触发KillJob
,其主要的代码逻辑为:
- 删除这个job对应的所有的pod,同时统计各个状态的pod的数量
- 更新Job的状态
- 删除Job对应的PodGroup
通过Command可以传入RestartJob或者AbortJob等动作。
比如runnintState
在碰到***RestartJobAction
就会触发KillJob
***
func (ps *runningState) Execute(action v1alpha1.Action) error {
switch action {
case v1alpha1.RestartJobAction:
return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Restarting
status.RetryCount++
return true
})
// pkg/controllers/job/job_controller_actions.go
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
if job.DeletionTimestamp != nil {
// 不处理正在删除中的job(可能有其他协程的worker在处理)
return nil
}
var pending, running, terminating, succeeded, failed, unknown int32
var errs []error
var total int
// 删除该job下的所有pod
for _, pods := range jobInfo.Pods {
for _, pod := range pods {
total++
if pod.DeletionTimestamp != nil {
// 跳过已经在删除中的pod
terminating++
continue
}
_, retain := podRetainPhase[pod.Status.Phase]
if !retain {
// 调用kube-apiserver的接口删除状态不是Success或者Failed的pod
err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
continue
}
errs = append(errs, err)
cc.resyncTask(pod)
}
classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
}
}
// 错误处理略
// 更新Job状态,将各种状态的pod数量更新到status中
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
// 错误处理略
// 更新job缓存
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
// 删除这个Job对应的PodGroups
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}); err != nil {
// 错误处理略
}
if err := cc.pluginOnJobDelete(job); err != nil {
return err
}
return nil
}
其实最终这个job还会是触发调谐。因为会updateJobStatus
通过更新到其他的status重新触发一个新的状态,然后根据不同的action去触发动作
比如running→restart→pending→syncJob(running)
processResyncTask
jobController的另一个主要流程如下:
//syncJob和KillJob中进行pod删除的时候,会将删除失败的pod进行入队
func (cc *jobcontroller) resyncTask(task *v1.Pod) {
cc.errTasks.AddRateLimited(task)
}
func (cc *jobcontroller) processResyncTask() {
obj, shutdown := cc.errTasks.Get()
if shutdown {
return
}
// one task only resync 10 times
if cc.errTasks.NumRequeues(obj) > 10 { //addRateLimited会记录一次requeue,失败10次则退出
cc.errTasks.Forget(obj)
return
}
defer cc.errTasks.Done(obj)
task, ok := obj.(*v1.Pod)
if !ok {
klog.Errorf("failed to convert %v to *v1.Pod", obj)
return
}
if err := cc.syncTask(task); err != nil {
klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
cc.resyncTask(task) //如果报错,则重新入队
}
}
func (cc *jobcontroller) syncTask(oldTask *v1.Pod) error {
newPod, err := cc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
if err := cc.cache.DeletePod(oldTask); err != nil {
klog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err)
return err
}
klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)
return nil
}
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
}
return cc.cache.UpdatePod(newPod)
}
//注意,虽然这里的errTasks是pod删除失败导致的,但是在syncTask中,没有再重试去apiserver删除pod
//因为job本身就会重试删除,所以这里只是update jobCache即可
总结
jobController主要监听了4个主要的资源
- vcJob的add,update,delete; 这些动作会将Request对象(包含Action,exitCode,Action等)入队,然后worker 协程会取出自己负责的queue中的req,进行处理
- 处理的逻辑通常就是syncJob和killJob,根据不同的job状态以及req中判断得来的action进行下一步操作(也就是当前是啥状态,下一步要做啥操作)
- syncJob一般就是创建pg,扩缩pod等等
- killJob 就是删除pod,然后切换到新的phase
- pod的add,update,delete,主要也是触发对应jobcache中对应job的pod信息修改,同样触发job调谐
- pg的update,其实就是pg被调度了之后,触发对job的调谐
- command的add事件,cmd创建了之后,会放入自己的queue,取出的时候会从api-server删掉cmd,然后将cmd制定的action给封装为req 去调谐job
- cmd的用处就是通过vccli来进行干预job的生命周期
- 对于删除失败的pod,会进行一个协程重试,但是重试只是从cache中删除,job调谐本身就会删除pod