searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

k8s调度器Volcano源码解读(2)

2023-08-30 01:36:08
88
0

Job控制器是极为重要的一个控制器

 

type jobcontroller struct {
	kubeClient kubernetes.Interface
	vcClient   vcclientset.Interface

	jobInformer   batchinformer.JobInformer
	podInformer   coreinformers.PodInformer
	pvcInformer   coreinformers.PersistentVolumeClaimInformer
	pgInformer    schedulinginformers.PodGroupInformer
	svcInformer   coreinformers.ServiceInformer
	cmdInformer   businformer.CommandInformer
	pcInformer    kubeschedulinginformers.PriorityClassInformer
	queueInformer schedulinginformers.QueueInformer

	// A store of jobs
	jobLister batchlister.JobLister
	jobSynced func() bool

	// A store of pods
	podLister corelisters.PodLister
	podSynced func() bool

	pvcLister corelisters.PersistentVolumeClaimLister
	pvcSynced func() bool

	// A store of podgroups
	pgLister schedulinglisters.PodGroupLister
	pgSynced func() bool

	// A store of service
	svcLister corelisters.ServiceLister
	svcSynced func() bool

	cmdLister buslister.CommandLister
	cmdSynced func() bool

	pcLister kubeschedulinglisters.PriorityClassLister
	pcSynced func() bool

	queueLister schedulinglisters.QueueLister
	queueSynced func() bool

	// queue that need to sync up
	queueList    []workqueue.RateLimitingInterface // queueList是所有watch到的对象,注意这里之所以是slice是为了多worker协同,每个worker一个queue
    // controller根据job的namecepace-name来进行hash后随机分配到各个queue中
	commandQueue workqueue.RateLimitingInterface
	cache        jobcache.Cache
	// Job Event recorder
	recorder record.EventRecorder

	errTasks      workqueue.RateLimitingInterface
	workers       uint32
	maxRequeueNum int
}

// 内部包含了多种对象的informer和lister

queueList的本质是一个队列,队列的元素是自定义的一个Request对象,可以看到Request中主要包含的是跟Job相关的key信息,这也符合一般的队列模型,queue中存放key,cache中存放实际的数据:

 

// pkg/controllers/apis/request.go
// Request struct.
type Request struct {
	Namespace string    // job的namespace
	JobName   string    // job的name
	TaskName  string    // task的name
	QueueName string    // 分配到的Queue的name

	Event      v1alpha1.Event
	ExitCode   int32
	Action     v1alpha1.Action
	JobVersion int32
}

cache的本质是一个Job资源的map,key是namespace/name

有了Indexer为什么还用cache?因为cache聚合了job和其拥有的pod,所以用cache获取job以及所拥有的pod更方便?

type jobCache struct {
	sync.Mutex

	jobs        map[string]*apis.JobInfo
	deletedJobs workqueue.RateLimitingInterface
}

//JobInfo struct. value中既包含了Job的信息,也包含了这个job对应的Pods的信息
type JobInfo struct {
	Namespace string
	Name      string

	Job  *batch.Job
	Pods map[string]map[string]*v1.Pod
}

jobcontroller 的初始化函数

func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
	cc.kubeClient = opt.KubeClient
	cc.vcClient = opt.VolcanoClient

	sharedInformers := opt.SharedInformerFactory
	workers := opt.WorkerNum
	// Initialize event client
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

	cc.queueList = make([]workqueue.RateLimitingInterface, workers)
	cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	cc.cache = jobcache.New()
	cc.errTasks = newRateLimitingQueue()
	cc.recorder = recorder
	cc.workers = workers
	cc.maxRequeueNum = opt.MaxRequeueNum
	if cc.maxRequeueNum < 0 {
		cc.maxRequeueNum = -1
	}

	var i uint32
	for i = 0; i < workers; i++ {
		cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	} //创建workers数量个queue用于存放job

	cc.jobInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Batch().V1alpha1().Jobs()
	cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cc.addJob,
		UpdateFunc: cc.updateJob,
		DeleteFunc: cc.deleteJob,
	})
	cc.jobLister = cc.jobInformer.Lister()
	cc.jobSynced = cc.jobInformer.Informer().HasSynced

	cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands()
	cc.cmdInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch v := obj.(type) {
				case *busv1alpha1.Command:
					if v.TargetObject != nil &&
						v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
						v.TargetObject.Kind == "Job" {
						return true
					}

					return false
				default:
					return false
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc: cc.addCommand,
			},
		},
	)
	cc.cmdLister = cc.cmdInformer.Lister()
	cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

	cc.podInformer = sharedInformers.Core().V1().Pods()
	cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cc.addPod,
		UpdateFunc: cc.updatePod,
		DeleteFunc: cc.deletePod,
	})

	cc.podLister = cc.podInformer.Lister()
	cc.podSynced = cc.podInformer.Informer().HasSynced

	cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
	cc.pvcLister = cc.pvcInformer.Lister()
	cc.pvcSynced = cc.pvcInformer.Informer().HasSynced

	cc.svcInformer = sharedInformers.Core().V1().Services()
	cc.svcLister = cc.svcInformer.Lister()
	cc.svcSynced = cc.svcInformer.Informer().HasSynced

	cc.pgInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().PodGroups()
	cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		UpdateFunc: cc.updatePodGroup,
	})
	cc.pgLister = cc.pgInformer.Lister()
	cc.pgSynced = cc.pgInformer.Informer().HasSynced

	cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses()
	cc.pcLister = cc.pcInformer.Lister()
	cc.pcSynced = cc.pcInformer.Informer().HasSynced

	cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues()
	cc.queueLister = cc.queueInformer.Lister()
	cc.queueSynced = cc.queueInformer.Informer().HasSynced

	// Register actions 
	state.SyncJob = cc.syncJob
	state.KillJob = cc.killJob

	return nil
}

// pkg/controllers/job/state/factory.go
var (
	// SyncJob will create or delete Pods according to Job's spec.
	SyncJob ActionFn
	// KillJob kill all Pods of Job with phase not in podRetainPhase.
	KillJob KillActionFn
)

//State interface.
type State interface {
	// Execute executes the actions based on current state.
	Execute(act v1alpha1.Action) error
}

// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
	job := jobInfo.Job
	switch job.Status.State.Phase {
	case vcbatch.Pending:
		return &pendingState{job: jobInfo}
	case vcbatch.Running:
		return &runningState{job: jobInfo}
	case vcbatch.Restarting:
		return &restartingState{job: jobInfo}
	case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
		return &finishedState{job: jobInfo}
	case vcbatch.Terminating:
		return &terminatingState{job: jobInfo}
	case vcbatch.Aborting:
		return &abortingState{job: jobInfo}
	case vcbatch.Aborted:
		return &abortedState{job: jobInfo}
	case vcbatch.Completing:
		return &completingState{job: jobInfo}
	}

	// It's pending by default.
	return &pendingState{job: jobInfo}
}

syncJob和killJob是2个action,其实是定义了一个Job在不同的state的时候,如果触发了某个action,该如何动作 state 其实就是job.status.state, 总共有pending,aborting,running,completing等 action 其实就是job spec定义的policy,当一些事件发生后要采取的动作。 所以在不同的状态下,对应不同的action 也要采取不同的动作,而动作只有2个,就是syncJob和killJob

// job informer的触发eventhandler
func (cc *jobcontroller) addJob(obj interface{}) {
	job, ok := obj.(*batch.Job)
	if !ok {
		klog.Errorf("obj is not Job")
		return
	}

	req := apis.Request{
		Namespace: job.Namespace,
		JobName:   job.Name,

		Event: bus.OutOfSyncEvent,  //job的增删改,都触发这个事件,意思是job没有同步
	}

	// TODO(k82cn): if failed to add job, the cache should be refresh
	if err := cc.cache.Add(job); err != nil {   //job进入缓存,如果放入缓存报错?
		klog.Errorf("Failed to add job <%s/%s>: %v in cache",
			job.Namespace, job.Name, err)
	}
	key := jobhelpers.GetJobKeyByReq(&req)
	queue := cc.getWorkerQueue(key)  //计算当前job要塞入哪个queue
	queue.Add(req)
}

func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
	newJob, ok := newObj.(*batch.Job)
	if !ok {
		klog.Errorf("newObj is not Job")
		return
	}

	oldJob, ok := oldObj.(*batch.Job)
	if !ok {
		klog.Errorf("oldJob is not Job")
		return
	}

	// No need to update if ResourceVersion is not changed
	if newJob.ResourceVersion == oldJob.ResourceVersion {
		klog.V(6).Infof("No need to update because job is not modified.")
		return
	}

	if err := cc.cache.Update(newJob); err != nil {
		klog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
			newJob.Namespace, newJob.Name, err)
	}

	// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
	// For Job status, it's used internally and always been updated via our controller.
// 只根据spec进行调谐,而status只会被当前控制器更新,所以如果status更新了一定是当前控制器搞的。
	if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
		klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.")
		return
	}

	req := apis.Request{
		Namespace: newJob.Namespace,
		JobName:   newJob.Name,
		Event:     bus.OutOfSyncEvent,
	}
	key := jobhelpers.GetJobKeyByReq(&req)
	queue := cc.getWorkerQueue(key)
	queue.Add(req)
}

func (cc *jobcontroller) deleteJob(obj interface{}) {
	job, ok := obj.(*batch.Job)
	if !ok {
		// If we reached here it means the Job was deleted but its final state is unrecorded.
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)   //relist导致的删除
		if !ok {
			klog.Errorf("Couldn't get object from tombstone %#v", obj)
			return
		}
		job, ok = tombstone.Obj.(*batch.Job)
		if !ok {
			klog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj)
			return
		}
	}

	if err := cc.cache.Delete(job); err != nil {
		klog.Errorf("Failed to delete job <%s/%s>: %v in cache",
			job.Namespace, job.Name, err)
	}
}


// 根据job的key来判断要进入哪个queue
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
	var hashVal hash.Hash32
	var val uint32

	hashVal = fnv.New32()
	hashVal.Write([]byte(key))

	val = hashVal.Sum32()

	queue := cc.queueList[val%cc.workers]

	return queue
}

 

后续的一些逻辑判断,经常会让人看不懂,一个潜在的逻辑就是,req中的Action和Event,是有可能和cache中Job的状态是不一致的,cache中是最新的,而req可能滞后 所以,例如通过req来判断要执行sync的话(旧的事件),而job可能已经被删了,所以此时应该停止sync

// cache 对于job的缓存, 这里没有操作pod, pod在被创建出来后才会进入cache,所以这里初始化的pod集合都是空map
func (jc *jobCache) Add(job *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()
	key := JobKey(job)   //就是ns/name 来确定key
	if jobInfo, found := jc.jobs[key]; found {
		if jobInfo.Job == nil {
			jobInfo.SetJob(job)

			return nil
		}
		return fmt.Errorf("duplicated jobInfo <%v>", key)
	}

	jc.jobs[key] = &apis.JobInfo{
		Name:      job.Name,
		Namespace: job.Namespace,

		Job:  job,
		Pods: make(map[string]map[string]*v1.Pod),  //第一个key是task名字,第二个key是pod UID
	}

	return nil
}

func (jc *jobCache) Update(obj *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()

	key := JobKey(obj)
	job, found := jc.jobs[key]
	if !found {
		return fmt.Errorf("failed to find job <%v>", key)
	}

	var oldResourceversion, newResourceversion uint64
	var err error
	if oldResourceversion, err = strconv.ParseUint(job.Job.ResourceVersion, 10, 64); err != nil {
		return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, job.Job.ResourceVersion)
	}

	if newResourceversion, err = strconv.ParseUint(obj.ResourceVersion, 10, 64); err != nil {
		return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, obj.ResourceVersion)
	}
	if newResourceversion < oldResourceversion {
		return fmt.Errorf("job <%v> has too old resource version: %d (%d)", key, newResourceversion, oldResourceversion)
	}
	job.Job = obj
	return nil
}

func (jc *jobCache) Delete(obj *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()

	key := JobKey(obj)
	jobInfo, found := jc.jobs[key]
	if !found {
		return fmt.Errorf("failed to find job <%v>", key)
	}
	jobInfo.Job = nil
	jc.deleteJob(jobInfo)

	return nil
}

知道了controller中的关键的数据结构,我们也就能猜测controller的reconcile的逻辑了:生产者通过list-watch将Job的key信息加入到queueList中,将Job的实体信息保存到jobCache中缓存,消费者从queueList中获取数据并进行处理。其主要代码在processNextReq函数中:

// Run start JobController.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
	go cc.jobInformer.Informer().Run(stopCh)
	go cc.podInformer.Informer().Run(stopCh)
	go cc.pvcInformer.Informer().Run(stopCh)
	go cc.pgInformer.Informer().Run(stopCh)
	go cc.svcInformer.Informer().Run(stopCh)
	go cc.cmdInformer.Informer().Run(stopCh)
	go cc.pcInformer.Informer().Run(stopCh)
	go cc.queueInformer.Informer().Run(stopCh)

	cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
		cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)

	go wait.Until(cc.handleCommands, 0, stopCh)
	var i uint32
	for i = 0; i < cc.workers; i++ {
		go func(num uint32) {
			wait.Until(
				func() {
					cc.worker(num)  //根据worker的编号,进行处理对应的queue
				},
				time.Second,
				stopCh)
		}(i)
	}

	go cc.cache.Run(stopCh)

	// Re-sync error tasks.
	go wait.Until(cc.processResyncTask, 0, stopCh)   //定期resync err tasks

	klog.Infof("JobController is running ...... ")
}

可以看到这里分别有4类协程

  1. cc.worker(num)
  2. cc.cache.Run(stopCh)
  3. cc.processResyncTask
  4. cc.handleCommands
// jobController.worker(int num)
func (cc *jobcontroller) worker(i uint32) {
	klog.Infof("worker %d start ...... ", i)

	for cc.processNextReq(i) {
	}
}

unc (cc *jobcontroller) processNextReq(count uint32) bool {
	queue := cc.queueList[count]  //取出当前worker 负责的queue
	obj, shutdown := queue.Get()
	if shutdown {
		klog.Errorf("Fail to pop item from queue")
		return false
	}

	req := obj.(apis.Request)
	defer queue.Done(req)

	key := jobcache.JobKeyByReq(&req)  //如果这个Job不属于当前这个 id 的worker 则报错,重新入队
	if !cc.belongsToThisRoutine(key, count) {
		klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
		queueLocal := cc.getWorkerQueue(key)
		queueLocal.Add(req)
		return true
	}

	klog.V(3).Infof("Try to handle request <%v>", req)

	jobInfo, err := cc.cache.Get(key)
	if err != nil {
		// TODO(k82cn): ignore not-ready error.
		klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
		return true
	}

	st := state.NewState(jobInfo)  //构造一个state对象,根据job.Status.State.Phase来确定state的类型
	if st == nil {
		klog.Errorf("Invalid state <%s> of Job <%v/%v>",
			jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
		return true
	}

	action := applyPolicies(jobInfo.Job, &req)  //根据job的policy等信息获取action
	klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
		action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)

	if action != busv1alpha1.SyncJobAction {
		cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
			"Start to execute action %s ", action))
	}  //如果不是syncJobAction,则代表是一些特殊的action,则记录一下事件

	if err := st.Execute(action); err != nil {  //执行对应state的逻辑,里面主要是KillJob或者SyncJob
		if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
			klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
				jobInfo.Job.Namespace, jobInfo.Job.Name, err)
			// If any error, requeue it.
			queue.AddRateLimited(req)
			return true
		}

		cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
			"Job failed on action %s for retry limit reached", action))
		klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
		if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
			klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
		}
		klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
	}

	// If no error, forget it.
	queue.Forget(req)

	return true
}




func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action {
	if len(req.Action) != 0 {
		return req.Action
	}

	if req.Event == v1alpha1.OutOfSyncEvent {
		return v1alpha1.SyncJobAction
	}

	// For all the requests triggered from discarded job resources will perform sync action instead
// 因为cache可能是新的,req从queue里来,可能是旧的数据,所以以最新的cache job数据为准
	if req.JobVersion < job.Status.Version {
		klog.Infof("Request %s is outdated, will perform sync instead.", req)
		return v1alpha1.SyncJobAction
	}

	// Overwrite Job level policies, 如果是pod触发了task级别的事件,则使用task policy action
	if len(req.TaskName) != 0 {   //如果是一个task 触发的req,则使用task的policy, 当pod触发事件的时候,req里面保存了TaskName,其实就代表是task级别要处理的一些action
		// Parse task level policies
		for _, task := range job.Spec.Tasks {
			if task.Name == req.TaskName {
				for _, policy := range task.Policies {
					policyEvents := getEventlist(policy)  //获取这个Policy定义的event列表

					if len(policyEvents) > 0 && len(req.Event) > 0 {  //如果当前触发的req里面的event在这个policy要处理的events中,那么代表这个req要触发对应的event的action
						if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
							return policy.Action
						}
					}

					// 0 is not an error code, is prevented in validation admission controller
				 // 不支持定义 退出码为0 作为作业策略的退出码依据,会被addmission控制,也就是如果req触发的退出码和当前policy希望触发的退出码相同,那也可以触发action
					if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
						return policy.Action
					}
				}
				break
			}
		}
	}

	// Parse Job level policies,如果是job级别的事件
	for _, policy := range job.Spec.Policies {
		policyEvents := getEventlist(policy)

		if len(policyEvents) > 0 && len(req.Event) > 0 {
			if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
				return policy.Action
			}
		}

		// 0 is not an error code, is prevented in validation admission controller
		if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
			return policy.Action
		}
	}
//如果啥都没有,那就是sync 动作

	return v1alpha1.SyncJobAction
}

 

func (ps *runningState) Execute(action v1alpha1.Action) error {
	switch action {
	case v1alpha1.RestartJobAction:
		return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Restarting
			status.RetryCount++
			return true
		})
	case v1alpha1.AbortJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Aborting
			return true
		})
	case v1alpha1.TerminateJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Terminating
			return true
		})
	case v1alpha1.CompleteJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Completing
			return true
		})
	default:
//传进去一个函数,其实要调谐的目标是status
		return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
			jobReplicas := TotalTasks(ps.job.Job)  //获取pod副本总数
			if jobReplicas == 0 {
				// when scale down to zero, keep the current job phase
				return false
			}

			minSuccess := ps.job.Job.Spec.MinSuccess
//达到了最小成功数
			if minSuccess != nil && status.Succeeded >= *minSuccess {
				status.State.Phase = vcbatch.Completed
				return true
			}

			totalTaskMinAvailable := TotalTaskMinAvailable(ps.job.Job)  //获取total 每个task的最小可用数
			if status.Succeeded+status.Failed == jobReplicas {   //如果这个Job已经执行完了,所以成功的+失败的等于总数pod
				if ps.job.Job.Spec.MinAvailable >= totalTaskMinAvailable {  
					for _, task := range ps.job.Job.Spec.Tasks {
						if task.MinAvailable == nil {
							continue
						}

						if taskStatus, ok := status.TaskStatusCount[task.Name]; ok {  //某一个task的pod成功数小于她自己的MinAvailable,则这个job是失败的
							if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
								status.State.Phase = vcbatch.Failed
								return true
							}
						}
					}
				}

				if minSuccess != nil && status.Succeeded < *minSuccess {
					status.State.Phase = vcbatch.Failed
				} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { //成功数要大于minSuccess 和 MinAvailable 才算成功
					status.State.Phase = vcbatch.Completed
				} else {
					status.State.Phase = vcbatch.Failed
				}
				return true
			}
			return false  //返回false代表不更新status
		})
	}
}

通过以上过程我们可以看到Job的reconcile中存在比较多的状态,因此代码中使用了Action和State两个状态来进行状态机的转移,不过最终处理的逻辑主要就是SyncJobKillJob两种,因此我们主要分析这两部分的逻辑。

SyncJob

从前面状态转移的表格中,我们看到只有PendingRunning状态才有机会进入到SyncJob的流程中,其实也就是从创建Job到Job正常运行的过程,因此可以预料SyncJob主要就是将Job运行起来。主要的流程如下:

  • 对于新Job先进行初始化:创建对应的PVC(volcano中的pvc需要自己管理,没有k8s的controller)和PodGroup(一个Job对应一个PodGroup),注意创建出来的PodGroup则由PodGroup controller管理且其name和Job保持一致
  • 根据Job中的Task变化来生成要创建的pod list和要删除的pod list
  • 分别起协程调用kube-apiserver创建和删除这两个list中的pod,需要注意的是,为了不让k8s的调度器处理这些新创建的pod,Job中需要携带调度器的信息并最终传入到pod上,这样k8s的调度器会过滤掉这些带有volcano调度器名字的pod,同样volcano的调度器则只会过滤出这些带有volcano调度器名字的pod,避免相互影响,在创建Job的时候,webhook中会默认给Job加上volcano这个调度器名字
  • 更新Job的状态,更新jobCache缓存
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
	job := jobInfo.Job
	klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
	defer klog.V(3).Infof("Finished Job <%s/%s> sync up, current version %d", job.Namespace, job.Name, job.Status.Version)

	if jobInfo.Job.DeletionTimestamp != nil {   //如果已经被删了,那级应该走killJob流程了
		klog.Infof("Job <%s/%s> is terminating, skip management process.",
			jobInfo.Job.Namespace, jobInfo.Job.Name)
		return nil
	}

	// deep copy job to prevent mutate it
	job = job.DeepCopy()

	// Find queue that job belongs to, and check if the queue has forwarding metadata
	queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)
	if err != nil {
		return err
	}

	var jobForwarding bool
	if len(queueInfo.Spec.ExtendClusters) != 0 {  //这里待查含义
		jobForwarding = true
		if len(job.Annotations) == 0 {
			job.Annotations = make(map[string]string)
		}
		job.Annotations[batch.JobForwardingKey] = "true"
		job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
			return err
		}
	}

	// Skip job initiation if job is already initiated
	if !isInitiated(job) {  //isInitiated(job)  判断是否已经初始化好了,也就是非pending的job 都代表已经初始化好了,这里的意思是没有初始化完成,也就是job处于pending的话,那就触发初始化,包括pg创建,插件的一些处理等
		if job, err = cc.initiateJob(job); err != nil {
			return err
		}
	} else {
		// TODO: optimize this call it only when scale up/down
		if err = cc.initOnJobUpdate(job); err != nil {   // 如果不是pending job,则更新job
			return err
		}
	}

	if len(queueInfo.Spec.ExtendClusters) != 0 {
		jobForwarding = true
		job.Annotations[batch.JobForwardingKey] = "true"
		_, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
			return err
		}
	}

	var syncTask bool  
	pgName := job.Name + "-" + string(job.UID)
	if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {
		if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
			syncTask = true  //如果pg 已经过了pending状态,比如经过了调度,那么就要synctasks,也就是对pod进行操作了
		} //因为pg的更新也会触发job的调谐,比如pg调度完成了

		for _, condition := range pg.Status.Conditions {
			if condition.Type == scheduling.PodGroupUnschedulableType {
				cc.recorder.Eventf(job, v1.EventTypeWarning, string(batch.PodGroupPending),
					fmt.Sprintf("PodGroup %s:%s unschedule,reason: %s", job.Namespace, job.Name, condition.Message))
			}
		}
	}

	var jobCondition batch.JobCondition
	if !syncTask {  //不需要同步task的话,意味着pg没有被调度?,则只需要更新job status
		if updateStatus != nil {
			if updateStatus(&job.Status) {
				job.Status.State.LastTransitionTime = metav1.Now()
				jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
				job.Status.Conditions = append(job.Status.Conditions, jobCondition)
			}
		}
		newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("Failed to update status of Job %v/%v: %v",
				job.Namespace, job.Name, err)
			return err
		}
		if e := cc.cache.Update(newJob); e != nil {
			klog.Errorf("SyncJob - Failed to update Job %v/%v in cache:  %v",
				newJob.Namespace, newJob.Name, e)
			return e
		}
		return nil
	}

	var running, pending, terminating, succeeded, failed, unknown int32
	taskStatusCount := make(map[string]batch.TaskState)

	podToCreate := make(map[string][]*v1.Pod)
	var podToDelete []*v1.Pod
	var creationErrs []error
	var deletionErrs []error
	appendMutex := sync.Mutex{}

	appendError := func(container *[]error, err error) {
		appendMutex.Lock()
		defer appendMutex.Unlock()
		*container = append(*container, err)
	}

	waitCreationGroup := sync.WaitGroup{}

	for _, ts := range job.Spec.Tasks {
		ts.Template.Name = ts.Name
		tc := ts.Template.DeepCopy()
		name := ts.Template.Name

		pods, found := jobInfo.Pods[name]
		if !found {
			pods = map[string]*v1.Pod{}
		}

		var podToCreateEachTask []*v1.Pod
		for i := 0; i < int(ts.Replicas); i++ {
			podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)  //jobName-taskName-i的方式为pod生成名字
			if pod, found := pods[podName]; !found {
				newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)  //创建一个pod manifest
				if err := cc.pluginOnPodCreate(job, newPod); err != nil {
					return err
				}
				podToCreateEachTask = append(podToCreateEachTask, newPod)
				waitCreationGroup.Add(1) //后面有多个协程一起创建pod
			} else {
				delete(pods, podName)  //对scale down场景,pods中多余的哪些在后面会被删除,这里删掉的pod代表不需要被删除
				if pod.DeletionTimestamp != nil {  //比如从2个副本->1个,那么2号pod在后面会进入podToDelete,如此时如果1号pod被删了,那么就增加一个terminating的数值
					klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
					atomic.AddInt32(&terminating, 1)
					continue
				}
					//对已有的pod进行phase判断
				classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
				calcPodStatus(pod, taskStatusCount)  //然后根据pod phase的技术累积到taskStatusCount这个map
			}
		}
		podToCreate[ts.Name] = podToCreateEachTask
		for _, pod := range pods {
			podToDelete = append(podToDelete, pod)   //scale down剩下的pod要被删除
		}
	}

	for taskName, podToCreateEachTask := range podToCreate {
		if len(podToCreateEachTask) == 0 {
			continue
		}
		go func(taskName string, podToCreateEachTask []*v1.Pod) {  //每个task一个协程
			taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
			if job.Spec.Tasks[taskIndex].DependsOn != nil { //处理有task依赖的情况
				cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job)
			}

			for _, pod := range podToCreateEachTask {
				go func(pod *v1.Pod) {
					defer waitCreationGroup.Done()
//然后每个pod一个协程
					newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
					if err != nil && !apierrors.IsAlreadyExists(err) {  //创建失败,而且是已经存在的情况
						// Failed to create Pod, waitCreationGroup a moment and then create it again
						// This is to ensure all podsMap under the same Job created
						// So gang-scheduling could schedule the Job successfully
						klog.Errorf("Failed to create pod %s for Job %s, err %#v",
							pod.Name, job.Name, err)
						appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
					} else {
						classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
						calcPodStatus(pod, taskStatusCount)
						klog.V(5).Infof("Created Task <%s> of Job <%s/%s>",
							pod.Name, job.Namespace, job.Name)
					}
				}(pod)
			}
		}(taskName, podToCreateEachTask)
	}

	waitCreationGroup.Wait()

	if len(creationErrs) != 0 {  //有任何一个pod创建失败,则退出本次调谐
		cc.recorder.Event(job, v1.EventTypeWarning, FailedCreatePodReason,
			fmt.Sprintf("Error creating pods: %+v", creationErrs))
		return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
	}

	// Delete pods when scale down.
	waitDeletionGroup := sync.WaitGroup{}
	waitDeletionGroup.Add(len(podToDelete))
	for _, pod := range podToDelete {
		go func(pod *v1.Pod) {
			defer waitDeletionGroup.Done()
			err := cc.deleteJobPod(job.Name, pod)  //从apiserver删除pod
			if err != nil {
				// Failed to delete Pod, waitCreationGroup a moment and then create it again
				// This is to ensure all podsMap under the same Job created
				// So gang-scheduling could schedule the Job successfully
				klog.Errorf("Failed to delete pod %s for Job %s, err %#v",
					pod.Name, job.Name, err)
				appendError(&deletionErrs, err)
				cc.resyncTask(pod)
			} else {
				klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
					pod.Name, job.Namespace, job.Name)
				atomic.AddInt32(&terminating, 1)
			}
		}(pod)
	}
	waitDeletionGroup.Wait()

	if len(deletionErrs) != 0 {
		cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
			fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
		return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
	}
	job.Status = batch.JobStatus{
		State: job.Status.State,

		Pending:             pending,
		Running:             running,
		Succeeded:           succeeded,
		Failed:              failed,
		Terminating:         terminating,
		Unknown:             unknown,
		Version:             job.Status.Version,
		MinAvailable:        job.Spec.MinAvailable,
		TaskStatusCount:     taskStatusCount,
		ControlledResources: job.Status.ControlledResources,
		Conditions:          job.Status.Conditions,
		RetryCount:          job.Status.RetryCount,
	}

	if updateStatus != nil {
		if updateStatus(&job.Status) {
			job.Status.State.LastTransitionTime = metav1.Now()
			jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
			job.Status.Conditions = append(job.Status.Conditions, jobCondition)
		}
	}
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update status of Job %v/%v: %v",
			job.Namespace, job.Name, err)
		return err
	}
	if e := cc.cache.Update(newJob); e != nil {
		klog.Errorf("SyncJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, e)
		return e
	}

	return nil
}

syncJob报错,则会被processNextReq 进行判断是否要重试,需要的话会重新入队

初始化job,通常是job刚刚创建,要赋予其pending状态,以及其他字段的一些默认值,以及创建pg和pvc等

func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
	klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
	jobInstance, err := cc.initJobStatus(job)
	if err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
			fmt.Sprintf("Failed to initialize job status, err: %v", err))
		return nil, err
	}

	if err := cc.pluginOnJobAdd(jobInstance); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
			fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
		return nil, err
	}

	newJob, err := cc.createJobIOIfNotExist(jobInstance)  //这里主要是create pvc, job.spec里面可以写一个pvc的名字,如果不写的话,默认会生成一个 pvc
	if err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PVCError),
			fmt.Sprintf("Failed to create PVC, err: %v", err))
		return nil, err
	}

	if err := cc.createOrUpdatePodGroup(newJob); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
			fmt.Sprintf("Failed to create PodGroup, err: %v", err))
		return nil, err
	}

	return newJob, nil
}

func (cc *jobcontroller) initJobStatus(job *batch.Job) (*batch.Job, error) {
	if job.Status.State.Phase != "" {
		return job, nil
	}

	job.Status.State.Phase = batch.Pending
	job.Status.State.LastTransitionTime = metav1.Now()
	job.Status.MinAvailable = job.Spec.MinAvailable
	jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) //把pending这个状态放到condition里
	job.Status.Conditions = append(job.Status.Conditions, jobCondition)
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update status of Job %v/%v: %v",
			job.Namespace, job.Name, err)
		return nil, err
	}
	if err := cc.cache.Update(newJob); err != nil {  //更新jobCache缓存
		klog.Errorf("CreateJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, err)
		return nil, err
	}

	return newJob, nil
}


// 如果不是pending job,则更新job, 这里通常对应的场景是,job比如running起来了,外部更新了job spec
func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error {
	klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name)

	if err := cc.pluginOnJobUpdate(job); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
			fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
		return err
	}

	if err := cc.createOrUpdatePodGroup(job); err != nil {  //创建对应的podgroup
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
			fmt.Sprintf("Failed to create PodGroup, err: %v", err))
		return err
	}

	return nil
}


func (cc *jobcontroller) pluginOnJobUpdate(job *batch.Job) error {
	client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
	if job.Status.ControlledResources == nil {
		job.Status.ControlledResources = make(map[string]string)
	}
	for name, args := range job.Spec.Plugins {
		pb, found := plugins.GetPluginBuilder(name)
		if !found {
			err := fmt.Errorf("failed to get plugin %s", name)
			klog.Error(err)
			return err
		}
		klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
		if err := pb(client, args).OnJobUpdate(job); err != nil {  //对应plugin在job update的时候的处理,有的啥都没做,有的会更新cm
			klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err)
			return err
		}
	}

	return nil
}

//这里会调用到pluginBudiler,而目前支持的5种,都在init函数里注册了
// pkg/controllers/job/plugins/factory.go
func init() {
	RegisterPluginBuilder("ssh", ssh.New)
	RegisterPluginBuilder("env", env.New)
	RegisterPluginBuilder("svc", svc.New)
	RegisterPluginBuilder("tensorflow", tensorflow.New)
	RegisterPluginBuilder("mpi", mpi.New)
}

func RegisterPluginBuilder(name string, pc PluginBuilder) {
	pluginMutex.Lock()
	defer pluginMutex.Unlock()

	pluginBuilders[name] = pc
}

// GetPluginBuilder returns plugin builder for a given plugin name.
func GetPluginBuilder(name string) (PluginBuilder, bool) {
	pluginMutex.Lock()
	defer pluginMutex.Unlock()

	pb, found := pluginBuilders[name]
	return pb, found
}

其实Command这个cmd也是通过外部进行一些action的传入,来影响job,比如RestartJob 之类的,也就是用户通过cmd来影响job的行为。

jobController除了关注Job,也关注了pg的update,会将对应的job入队

也关注了cmd的create,会先入队到cmdQueue,然后再入队到jobqueue

同时也关注了pod,但是关注pod的时候会判断这个pod是否是跟job有关的,会通过一些annotation之类的,和controlledBy来判断,然后也是封装一个req来入队到jobuQueue

其实pod会触发一些job的同步,比如job的增删改,尤其是update,会触发判断task是否完成,然后触发不同的事件,对于deletePod,则触发的是PodEvictedEvent

 

KillJob

从前面的表格中可以看出KillJob主要是删除Job或者异常场景触发的,Job并不支持升级操作,只支持扩缩容,因此一旦遇到异常场景会直接触发KillJob,其主要的代码逻辑为:

  • 删除这个job对应的所有的pod,同时统计各个状态的pod的数量
  • 更新Job的状态
  • 删除Job对应的PodGroup

通过Command可以传入RestartJob或者AbortJob等动作。

比如runnintState 在碰到***RestartJobAction 就会触发KillJob***

func (ps *runningState) Execute(action v1alpha1.Action) error {
	switch action {
	case v1alpha1.RestartJobAction:
		return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Restarting
			status.RetryCount++
			return true
		})



// pkg/controllers/job/job_controller_actions.go
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
	job := jobInfo.Job
	if job.DeletionTimestamp != nil {
		// 不处理正在删除中的job(可能有其他协程的worker在处理)
		return nil
	}
	var pending, running, terminating, succeeded, failed, unknown int32
	var errs []error
	var total int
    // 删除该job下的所有pod
	for _, pods := range jobInfo.Pods {
		for _, pod := range pods {
			total++
			if pod.DeletionTimestamp != nil {
				// 跳过已经在删除中的pod
				terminating++
				continue
			}
			_, retain := podRetainPhase[pod.Status.Phase]
			if !retain {
                // 调用kube-apiserver的接口删除状态不是Success或者Failed的pod
				err := cc.deleteJobPod(job.Name, pod)
				if err == nil {
					terminating++
					continue
				}
				errs = append(errs, err)
				cc.resyncTask(pod)
			}

			classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
		}
	}
	// 错误处理略

	// 更新Job状态,将各种状态的pod数量更新到status中
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	// 错误处理略
    // 更新job缓存
	if e := cc.cache.Update(newJob); e != nil {
		klog.Errorf("KillJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, e)
		return e
	}
	// 删除这个Job对应的PodGroups
	if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}); err != nil {
		// 错误处理略
	}
	if err := cc.pluginOnJobDelete(job); err != nil {
		return err
	}
	return nil
}

其实最终这个job还会是触发调谐。因为会updateJobStatus

通过更新到其他的status重新触发一个新的状态,然后根据不同的action去触发动作

比如running→restart→pending→syncJob(running)

 

processResyncTask

jobController的另一个主要流程如下:

//syncJob和KillJob中进行pod删除的时候,会将删除失败的pod进行入队
func (cc *jobcontroller) resyncTask(task *v1.Pod) {
	cc.errTasks.AddRateLimited(task)
}

func (cc *jobcontroller) processResyncTask() {
	obj, shutdown := cc.errTasks.Get()
	if shutdown {
		return
	}

	// one task only resync 10 times
	if cc.errTasks.NumRequeues(obj) > 10 {   //addRateLimited会记录一次requeue,失败10次则退出
		cc.errTasks.Forget(obj)
		return
	}

	defer cc.errTasks.Done(obj)

	task, ok := obj.(*v1.Pod)
	if !ok {
		klog.Errorf("failed to convert %v to *v1.Pod", obj)
		return
	}

	if err := cc.syncTask(task); err != nil {
		klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
		cc.resyncTask(task)  //如果报错,则重新入队
	}
}

func (cc *jobcontroller) syncTask(oldTask *v1.Pod) error {
	newPod, err := cc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
	if err != nil {
		if errors.IsNotFound(err) {
			if err := cc.cache.DeletePod(oldTask); err != nil {
				klog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err)
				return err
			}
			klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)

			return nil
		}
		return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
	}

	return cc.cache.UpdatePod(newPod)
}

//注意,虽然这里的errTasks是pod删除失败导致的,但是在syncTask中,没有再重试去apiserver删除pod
//因为job本身就会重试删除,所以这里只是update jobCache即可

 

总结

jobController主要监听了4个主要的资源

  1. vcJob的add,update,delete; 这些动作会将Request对象(包含Action,exitCode,Action等)入队,然后worker 协程会取出自己负责的queue中的req,进行处理
    1. 处理的逻辑通常就是syncJob和killJob,根据不同的job状态以及req中判断得来的action进行下一步操作(也就是当前是啥状态,下一步要做啥操作)
    2. syncJob一般就是创建pg,扩缩pod等等
    3. killJob 就是删除pod,然后切换到新的phase
  2. pod的add,update,delete,主要也是触发对应jobcache中对应job的pod信息修改,同样触发job调谐
  3. pg的update,其实就是pg被调度了之后,触发对job的调谐
  4. command的add事件,cmd创建了之后,会放入自己的queue,取出的时候会从api-server删掉cmd,然后将cmd制定的action给封装为req 去调谐job
    1. cmd的用处就是通过vccli来进行干预job的生命周期
  5. 对于删除失败的pod,会进行一个协程重试,但是重试只是从cache中删除,job调谐本身就会删除pod
0条评论
0 / 1000
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
原创

k8s调度器Volcano源码解读(2)

2023-08-30 01:36:08
88
0

Job控制器是极为重要的一个控制器

 

type jobcontroller struct {
	kubeClient kubernetes.Interface
	vcClient   vcclientset.Interface

	jobInformer   batchinformer.JobInformer
	podInformer   coreinformers.PodInformer
	pvcInformer   coreinformers.PersistentVolumeClaimInformer
	pgInformer    schedulinginformers.PodGroupInformer
	svcInformer   coreinformers.ServiceInformer
	cmdInformer   businformer.CommandInformer
	pcInformer    kubeschedulinginformers.PriorityClassInformer
	queueInformer schedulinginformers.QueueInformer

	// A store of jobs
	jobLister batchlister.JobLister
	jobSynced func() bool

	// A store of pods
	podLister corelisters.PodLister
	podSynced func() bool

	pvcLister corelisters.PersistentVolumeClaimLister
	pvcSynced func() bool

	// A store of podgroups
	pgLister schedulinglisters.PodGroupLister
	pgSynced func() bool

	// A store of service
	svcLister corelisters.ServiceLister
	svcSynced func() bool

	cmdLister buslister.CommandLister
	cmdSynced func() bool

	pcLister kubeschedulinglisters.PriorityClassLister
	pcSynced func() bool

	queueLister schedulinglisters.QueueLister
	queueSynced func() bool

	// queue that need to sync up
	queueList    []workqueue.RateLimitingInterface // queueList是所有watch到的对象,注意这里之所以是slice是为了多worker协同,每个worker一个queue
    // controller根据job的namecepace-name来进行hash后随机分配到各个queue中
	commandQueue workqueue.RateLimitingInterface
	cache        jobcache.Cache
	// Job Event recorder
	recorder record.EventRecorder

	errTasks      workqueue.RateLimitingInterface
	workers       uint32
	maxRequeueNum int
}

// 内部包含了多种对象的informer和lister

queueList的本质是一个队列,队列的元素是自定义的一个Request对象,可以看到Request中主要包含的是跟Job相关的key信息,这也符合一般的队列模型,queue中存放key,cache中存放实际的数据:

 

// pkg/controllers/apis/request.go
// Request struct.
type Request struct {
	Namespace string    // job的namespace
	JobName   string    // job的name
	TaskName  string    // task的name
	QueueName string    // 分配到的Queue的name

	Event      v1alpha1.Event
	ExitCode   int32
	Action     v1alpha1.Action
	JobVersion int32
}

cache的本质是一个Job资源的map,key是namespace/name

有了Indexer为什么还用cache?因为cache聚合了job和其拥有的pod,所以用cache获取job以及所拥有的pod更方便?

type jobCache struct {
	sync.Mutex

	jobs        map[string]*apis.JobInfo
	deletedJobs workqueue.RateLimitingInterface
}

//JobInfo struct. value中既包含了Job的信息,也包含了这个job对应的Pods的信息
type JobInfo struct {
	Namespace string
	Name      string

	Job  *batch.Job
	Pods map[string]map[string]*v1.Pod
}

jobcontroller 的初始化函数

func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
	cc.kubeClient = opt.KubeClient
	cc.vcClient = opt.VolcanoClient

	sharedInformers := opt.SharedInformerFactory
	workers := opt.WorkerNum
	// Initialize event client
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.kubeClient.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

	cc.queueList = make([]workqueue.RateLimitingInterface, workers)
	cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	cc.cache = jobcache.New()
	cc.errTasks = newRateLimitingQueue()
	cc.recorder = recorder
	cc.workers = workers
	cc.maxRequeueNum = opt.MaxRequeueNum
	if cc.maxRequeueNum < 0 {
		cc.maxRequeueNum = -1
	}

	var i uint32
	for i = 0; i < workers; i++ {
		cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	} //创建workers数量个queue用于存放job

	cc.jobInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Batch().V1alpha1().Jobs()
	cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cc.addJob,
		UpdateFunc: cc.updateJob,
		DeleteFunc: cc.deleteJob,
	})
	cc.jobLister = cc.jobInformer.Lister()
	cc.jobSynced = cc.jobInformer.Informer().HasSynced

	cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands()
	cc.cmdInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch v := obj.(type) {
				case *busv1alpha1.Command:
					if v.TargetObject != nil &&
						v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
						v.TargetObject.Kind == "Job" {
						return true
					}

					return false
				default:
					return false
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc: cc.addCommand,
			},
		},
	)
	cc.cmdLister = cc.cmdInformer.Lister()
	cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

	cc.podInformer = sharedInformers.Core().V1().Pods()
	cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cc.addPod,
		UpdateFunc: cc.updatePod,
		DeleteFunc: cc.deletePod,
	})

	cc.podLister = cc.podInformer.Lister()
	cc.podSynced = cc.podInformer.Informer().HasSynced

	cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
	cc.pvcLister = cc.pvcInformer.Lister()
	cc.pvcSynced = cc.pvcInformer.Informer().HasSynced

	cc.svcInformer = sharedInformers.Core().V1().Services()
	cc.svcLister = cc.svcInformer.Lister()
	cc.svcSynced = cc.svcInformer.Informer().HasSynced

	cc.pgInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().PodGroups()
	cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		UpdateFunc: cc.updatePodGroup,
	})
	cc.pgLister = cc.pgInformer.Lister()
	cc.pgSynced = cc.pgInformer.Informer().HasSynced

	cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses()
	cc.pcLister = cc.pcInformer.Lister()
	cc.pcSynced = cc.pcInformer.Informer().HasSynced

	cc.queueInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Scheduling().V1beta1().Queues()
	cc.queueLister = cc.queueInformer.Lister()
	cc.queueSynced = cc.queueInformer.Informer().HasSynced

	// Register actions 
	state.SyncJob = cc.syncJob
	state.KillJob = cc.killJob

	return nil
}

// pkg/controllers/job/state/factory.go
var (
	// SyncJob will create or delete Pods according to Job's spec.
	SyncJob ActionFn
	// KillJob kill all Pods of Job with phase not in podRetainPhase.
	KillJob KillActionFn
)

//State interface.
type State interface {
	// Execute executes the actions based on current state.
	Execute(act v1alpha1.Action) error
}

// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
	job := jobInfo.Job
	switch job.Status.State.Phase {
	case vcbatch.Pending:
		return &pendingState{job: jobInfo}
	case vcbatch.Running:
		return &runningState{job: jobInfo}
	case vcbatch.Restarting:
		return &restartingState{job: jobInfo}
	case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
		return &finishedState{job: jobInfo}
	case vcbatch.Terminating:
		return &terminatingState{job: jobInfo}
	case vcbatch.Aborting:
		return &abortingState{job: jobInfo}
	case vcbatch.Aborted:
		return &abortedState{job: jobInfo}
	case vcbatch.Completing:
		return &completingState{job: jobInfo}
	}

	// It's pending by default.
	return &pendingState{job: jobInfo}
}

syncJob和killJob是2个action,其实是定义了一个Job在不同的state的时候,如果触发了某个action,该如何动作 state 其实就是job.status.state, 总共有pending,aborting,running,completing等 action 其实就是job spec定义的policy,当一些事件发生后要采取的动作。 所以在不同的状态下,对应不同的action 也要采取不同的动作,而动作只有2个,就是syncJob和killJob

// job informer的触发eventhandler
func (cc *jobcontroller) addJob(obj interface{}) {
	job, ok := obj.(*batch.Job)
	if !ok {
		klog.Errorf("obj is not Job")
		return
	}

	req := apis.Request{
		Namespace: job.Namespace,
		JobName:   job.Name,

		Event: bus.OutOfSyncEvent,  //job的增删改,都触发这个事件,意思是job没有同步
	}

	// TODO(k82cn): if failed to add job, the cache should be refresh
	if err := cc.cache.Add(job); err != nil {   //job进入缓存,如果放入缓存报错?
		klog.Errorf("Failed to add job <%s/%s>: %v in cache",
			job.Namespace, job.Name, err)
	}
	key := jobhelpers.GetJobKeyByReq(&req)
	queue := cc.getWorkerQueue(key)  //计算当前job要塞入哪个queue
	queue.Add(req)
}

func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
	newJob, ok := newObj.(*batch.Job)
	if !ok {
		klog.Errorf("newObj is not Job")
		return
	}

	oldJob, ok := oldObj.(*batch.Job)
	if !ok {
		klog.Errorf("oldJob is not Job")
		return
	}

	// No need to update if ResourceVersion is not changed
	if newJob.ResourceVersion == oldJob.ResourceVersion {
		klog.V(6).Infof("No need to update because job is not modified.")
		return
	}

	if err := cc.cache.Update(newJob); err != nil {
		klog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
			newJob.Namespace, newJob.Name, err)
	}

	// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
	// For Job status, it's used internally and always been updated via our controller.
// 只根据spec进行调谐,而status只会被当前控制器更新,所以如果status更新了一定是当前控制器搞的。
	if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
		klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.")
		return
	}

	req := apis.Request{
		Namespace: newJob.Namespace,
		JobName:   newJob.Name,
		Event:     bus.OutOfSyncEvent,
	}
	key := jobhelpers.GetJobKeyByReq(&req)
	queue := cc.getWorkerQueue(key)
	queue.Add(req)
}

func (cc *jobcontroller) deleteJob(obj interface{}) {
	job, ok := obj.(*batch.Job)
	if !ok {
		// If we reached here it means the Job was deleted but its final state is unrecorded.
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)   //relist导致的删除
		if !ok {
			klog.Errorf("Couldn't get object from tombstone %#v", obj)
			return
		}
		job, ok = tombstone.Obj.(*batch.Job)
		if !ok {
			klog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj)
			return
		}
	}

	if err := cc.cache.Delete(job); err != nil {
		klog.Errorf("Failed to delete job <%s/%s>: %v in cache",
			job.Namespace, job.Name, err)
	}
}


// 根据job的key来判断要进入哪个queue
func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {
	var hashVal hash.Hash32
	var val uint32

	hashVal = fnv.New32()
	hashVal.Write([]byte(key))

	val = hashVal.Sum32()

	queue := cc.queueList[val%cc.workers]

	return queue
}

 

后续的一些逻辑判断,经常会让人看不懂,一个潜在的逻辑就是,req中的Action和Event,是有可能和cache中Job的状态是不一致的,cache中是最新的,而req可能滞后 所以,例如通过req来判断要执行sync的话(旧的事件),而job可能已经被删了,所以此时应该停止sync

// cache 对于job的缓存, 这里没有操作pod, pod在被创建出来后才会进入cache,所以这里初始化的pod集合都是空map
func (jc *jobCache) Add(job *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()
	key := JobKey(job)   //就是ns/name 来确定key
	if jobInfo, found := jc.jobs[key]; found {
		if jobInfo.Job == nil {
			jobInfo.SetJob(job)

			return nil
		}
		return fmt.Errorf("duplicated jobInfo <%v>", key)
	}

	jc.jobs[key] = &apis.JobInfo{
		Name:      job.Name,
		Namespace: job.Namespace,

		Job:  job,
		Pods: make(map[string]map[string]*v1.Pod),  //第一个key是task名字,第二个key是pod UID
	}

	return nil
}

func (jc *jobCache) Update(obj *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()

	key := JobKey(obj)
	job, found := jc.jobs[key]
	if !found {
		return fmt.Errorf("failed to find job <%v>", key)
	}

	var oldResourceversion, newResourceversion uint64
	var err error
	if oldResourceversion, err = strconv.ParseUint(job.Job.ResourceVersion, 10, 64); err != nil {
		return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, job.Job.ResourceVersion)
	}

	if newResourceversion, err = strconv.ParseUint(obj.ResourceVersion, 10, 64); err != nil {
		return fmt.Errorf("failed to parase job <%v> resource version <%s>", key, obj.ResourceVersion)
	}
	if newResourceversion < oldResourceversion {
		return fmt.Errorf("job <%v> has too old resource version: %d (%d)", key, newResourceversion, oldResourceversion)
	}
	job.Job = obj
	return nil
}

func (jc *jobCache) Delete(obj *v1alpha1.Job) error {
	jc.Lock()
	defer jc.Unlock()

	key := JobKey(obj)
	jobInfo, found := jc.jobs[key]
	if !found {
		return fmt.Errorf("failed to find job <%v>", key)
	}
	jobInfo.Job = nil
	jc.deleteJob(jobInfo)

	return nil
}

知道了controller中的关键的数据结构,我们也就能猜测controller的reconcile的逻辑了:生产者通过list-watch将Job的key信息加入到queueList中,将Job的实体信息保存到jobCache中缓存,消费者从queueList中获取数据并进行处理。其主要代码在processNextReq函数中:

// Run start JobController.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
	go cc.jobInformer.Informer().Run(stopCh)
	go cc.podInformer.Informer().Run(stopCh)
	go cc.pvcInformer.Informer().Run(stopCh)
	go cc.pgInformer.Informer().Run(stopCh)
	go cc.svcInformer.Informer().Run(stopCh)
	go cc.cmdInformer.Informer().Run(stopCh)
	go cc.pcInformer.Informer().Run(stopCh)
	go cc.queueInformer.Informer().Run(stopCh)

	cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
		cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)

	go wait.Until(cc.handleCommands, 0, stopCh)
	var i uint32
	for i = 0; i < cc.workers; i++ {
		go func(num uint32) {
			wait.Until(
				func() {
					cc.worker(num)  //根据worker的编号,进行处理对应的queue
				},
				time.Second,
				stopCh)
		}(i)
	}

	go cc.cache.Run(stopCh)

	// Re-sync error tasks.
	go wait.Until(cc.processResyncTask, 0, stopCh)   //定期resync err tasks

	klog.Infof("JobController is running ...... ")
}

可以看到这里分别有4类协程

  1. cc.worker(num)
  2. cc.cache.Run(stopCh)
  3. cc.processResyncTask
  4. cc.handleCommands
// jobController.worker(int num)
func (cc *jobcontroller) worker(i uint32) {
	klog.Infof("worker %d start ...... ", i)

	for cc.processNextReq(i) {
	}
}

unc (cc *jobcontroller) processNextReq(count uint32) bool {
	queue := cc.queueList[count]  //取出当前worker 负责的queue
	obj, shutdown := queue.Get()
	if shutdown {
		klog.Errorf("Fail to pop item from queue")
		return false
	}

	req := obj.(apis.Request)
	defer queue.Done(req)

	key := jobcache.JobKeyByReq(&req)  //如果这个Job不属于当前这个 id 的worker 则报错,重新入队
	if !cc.belongsToThisRoutine(key, count) {
		klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
		queueLocal := cc.getWorkerQueue(key)
		queueLocal.Add(req)
		return true
	}

	klog.V(3).Infof("Try to handle request <%v>", req)

	jobInfo, err := cc.cache.Get(key)
	if err != nil {
		// TODO(k82cn): ignore not-ready error.
		klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
		return true
	}

	st := state.NewState(jobInfo)  //构造一个state对象,根据job.Status.State.Phase来确定state的类型
	if st == nil {
		klog.Errorf("Invalid state <%s> of Job <%v/%v>",
			jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
		return true
	}

	action := applyPolicies(jobInfo.Job, &req)  //根据job的policy等信息获取action
	klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
		action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)

	if action != busv1alpha1.SyncJobAction {
		cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
			"Start to execute action %s ", action))
	}  //如果不是syncJobAction,则代表是一些特殊的action,则记录一下事件

	if err := st.Execute(action); err != nil {  //执行对应state的逻辑,里面主要是KillJob或者SyncJob
		if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
			klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
				jobInfo.Job.Namespace, jobInfo.Job.Name, err)
			// If any error, requeue it.
			queue.AddRateLimited(req)
			return true
		}

		cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
			"Job failed on action %s for retry limit reached", action))
		klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
		if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
			klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
		}
		klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
	}

	// If no error, forget it.
	queue.Forget(req)

	return true
}




func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action {
	if len(req.Action) != 0 {
		return req.Action
	}

	if req.Event == v1alpha1.OutOfSyncEvent {
		return v1alpha1.SyncJobAction
	}

	// For all the requests triggered from discarded job resources will perform sync action instead
// 因为cache可能是新的,req从queue里来,可能是旧的数据,所以以最新的cache job数据为准
	if req.JobVersion < job.Status.Version {
		klog.Infof("Request %s is outdated, will perform sync instead.", req)
		return v1alpha1.SyncJobAction
	}

	// Overwrite Job level policies, 如果是pod触发了task级别的事件,则使用task policy action
	if len(req.TaskName) != 0 {   //如果是一个task 触发的req,则使用task的policy, 当pod触发事件的时候,req里面保存了TaskName,其实就代表是task级别要处理的一些action
		// Parse task level policies
		for _, task := range job.Spec.Tasks {
			if task.Name == req.TaskName {
				for _, policy := range task.Policies {
					policyEvents := getEventlist(policy)  //获取这个Policy定义的event列表

					if len(policyEvents) > 0 && len(req.Event) > 0 {  //如果当前触发的req里面的event在这个policy要处理的events中,那么代表这个req要触发对应的event的action
						if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
							return policy.Action
						}
					}

					// 0 is not an error code, is prevented in validation admission controller
				 // 不支持定义 退出码为0 作为作业策略的退出码依据,会被addmission控制,也就是如果req触发的退出码和当前policy希望触发的退出码相同,那也可以触发action
					if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
						return policy.Action
					}
				}
				break
			}
		}
	}

	// Parse Job level policies,如果是job级别的事件
	for _, policy := range job.Spec.Policies {
		policyEvents := getEventlist(policy)

		if len(policyEvents) > 0 && len(req.Event) > 0 {
			if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) {
				return policy.Action
			}
		}

		// 0 is not an error code, is prevented in validation admission controller
		if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode {
			return policy.Action
		}
	}
//如果啥都没有,那就是sync 动作

	return v1alpha1.SyncJobAction
}

 

func (ps *runningState) Execute(action v1alpha1.Action) error {
	switch action {
	case v1alpha1.RestartJobAction:
		return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Restarting
			status.RetryCount++
			return true
		})
	case v1alpha1.AbortJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Aborting
			return true
		})
	case v1alpha1.TerminateJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Terminating
			return true
		})
	case v1alpha1.CompleteJobAction:
		return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Completing
			return true
		})
	default:
//传进去一个函数,其实要调谐的目标是status
		return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
			jobReplicas := TotalTasks(ps.job.Job)  //获取pod副本总数
			if jobReplicas == 0 {
				// when scale down to zero, keep the current job phase
				return false
			}

			minSuccess := ps.job.Job.Spec.MinSuccess
//达到了最小成功数
			if minSuccess != nil && status.Succeeded >= *minSuccess {
				status.State.Phase = vcbatch.Completed
				return true
			}

			totalTaskMinAvailable := TotalTaskMinAvailable(ps.job.Job)  //获取total 每个task的最小可用数
			if status.Succeeded+status.Failed == jobReplicas {   //如果这个Job已经执行完了,所以成功的+失败的等于总数pod
				if ps.job.Job.Spec.MinAvailable >= totalTaskMinAvailable {  
					for _, task := range ps.job.Job.Spec.Tasks {
						if task.MinAvailable == nil {
							continue
						}

						if taskStatus, ok := status.TaskStatusCount[task.Name]; ok {  //某一个task的pod成功数小于她自己的MinAvailable,则这个job是失败的
							if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
								status.State.Phase = vcbatch.Failed
								return true
							}
						}
					}
				}

				if minSuccess != nil && status.Succeeded < *minSuccess {
					status.State.Phase = vcbatch.Failed
				} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { //成功数要大于minSuccess 和 MinAvailable 才算成功
					status.State.Phase = vcbatch.Completed
				} else {
					status.State.Phase = vcbatch.Failed
				}
				return true
			}
			return false  //返回false代表不更新status
		})
	}
}

通过以上过程我们可以看到Job的reconcile中存在比较多的状态,因此代码中使用了Action和State两个状态来进行状态机的转移,不过最终处理的逻辑主要就是SyncJobKillJob两种,因此我们主要分析这两部分的逻辑。

SyncJob

从前面状态转移的表格中,我们看到只有PendingRunning状态才有机会进入到SyncJob的流程中,其实也就是从创建Job到Job正常运行的过程,因此可以预料SyncJob主要就是将Job运行起来。主要的流程如下:

  • 对于新Job先进行初始化:创建对应的PVC(volcano中的pvc需要自己管理,没有k8s的controller)和PodGroup(一个Job对应一个PodGroup),注意创建出来的PodGroup则由PodGroup controller管理且其name和Job保持一致
  • 根据Job中的Task变化来生成要创建的pod list和要删除的pod list
  • 分别起协程调用kube-apiserver创建和删除这两个list中的pod,需要注意的是,为了不让k8s的调度器处理这些新创建的pod,Job中需要携带调度器的信息并最终传入到pod上,这样k8s的调度器会过滤掉这些带有volcano调度器名字的pod,同样volcano的调度器则只会过滤出这些带有volcano调度器名字的pod,避免相互影响,在创建Job的时候,webhook中会默认给Job加上volcano这个调度器名字
  • 更新Job的状态,更新jobCache缓存
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
	job := jobInfo.Job
	klog.V(3).Infof("Starting to sync up Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
	defer klog.V(3).Infof("Finished Job <%s/%s> sync up, current version %d", job.Namespace, job.Name, job.Status.Version)

	if jobInfo.Job.DeletionTimestamp != nil {   //如果已经被删了,那级应该走killJob流程了
		klog.Infof("Job <%s/%s> is terminating, skip management process.",
			jobInfo.Job.Namespace, jobInfo.Job.Name)
		return nil
	}

	// deep copy job to prevent mutate it
	job = job.DeepCopy()

	// Find queue that job belongs to, and check if the queue has forwarding metadata
	queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)
	if err != nil {
		return err
	}

	var jobForwarding bool
	if len(queueInfo.Spec.ExtendClusters) != 0 {  //这里待查含义
		jobForwarding = true
		if len(job.Annotations) == 0 {
			job.Annotations = make(map[string]string)
		}
		job.Annotations[batch.JobForwardingKey] = "true"
		job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
			return err
		}
	}

	// Skip job initiation if job is already initiated
	if !isInitiated(job) {  //isInitiated(job)  判断是否已经初始化好了,也就是非pending的job 都代表已经初始化好了,这里的意思是没有初始化完成,也就是job处于pending的话,那就触发初始化,包括pg创建,插件的一些处理等
		if job, err = cc.initiateJob(job); err != nil {
			return err
		}
	} else {
		// TODO: optimize this call it only when scale up/down
		if err = cc.initOnJobUpdate(job); err != nil {   // 如果不是pending job,则更新job
			return err
		}
	}

	if len(queueInfo.Spec.ExtendClusters) != 0 {
		jobForwarding = true
		job.Annotations[batch.JobForwardingKey] = "true"
		_, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("failed to update job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
			return err
		}
	}

	var syncTask bool  
	pgName := job.Name + "-" + string(job.UID)
	if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {
		if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
			syncTask = true  //如果pg 已经过了pending状态,比如经过了调度,那么就要synctasks,也就是对pod进行操作了
		} //因为pg的更新也会触发job的调谐,比如pg调度完成了

		for _, condition := range pg.Status.Conditions {
			if condition.Type == scheduling.PodGroupUnschedulableType {
				cc.recorder.Eventf(job, v1.EventTypeWarning, string(batch.PodGroupPending),
					fmt.Sprintf("PodGroup %s:%s unschedule,reason: %s", job.Namespace, job.Name, condition.Message))
			}
		}
	}

	var jobCondition batch.JobCondition
	if !syncTask {  //不需要同步task的话,意味着pg没有被调度?,则只需要更新job status
		if updateStatus != nil {
			if updateStatus(&job.Status) {
				job.Status.State.LastTransitionTime = metav1.Now()
				jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
				job.Status.Conditions = append(job.Status.Conditions, jobCondition)
			}
		}
		newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
		if err != nil {
			klog.Errorf("Failed to update status of Job %v/%v: %v",
				job.Namespace, job.Name, err)
			return err
		}
		if e := cc.cache.Update(newJob); e != nil {
			klog.Errorf("SyncJob - Failed to update Job %v/%v in cache:  %v",
				newJob.Namespace, newJob.Name, e)
			return e
		}
		return nil
	}

	var running, pending, terminating, succeeded, failed, unknown int32
	taskStatusCount := make(map[string]batch.TaskState)

	podToCreate := make(map[string][]*v1.Pod)
	var podToDelete []*v1.Pod
	var creationErrs []error
	var deletionErrs []error
	appendMutex := sync.Mutex{}

	appendError := func(container *[]error, err error) {
		appendMutex.Lock()
		defer appendMutex.Unlock()
		*container = append(*container, err)
	}

	waitCreationGroup := sync.WaitGroup{}

	for _, ts := range job.Spec.Tasks {
		ts.Template.Name = ts.Name
		tc := ts.Template.DeepCopy()
		name := ts.Template.Name

		pods, found := jobInfo.Pods[name]
		if !found {
			pods = map[string]*v1.Pod{}
		}

		var podToCreateEachTask []*v1.Pod
		for i := 0; i < int(ts.Replicas); i++ {
			podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)  //jobName-taskName-i的方式为pod生成名字
			if pod, found := pods[podName]; !found {
				newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)  //创建一个pod manifest
				if err := cc.pluginOnPodCreate(job, newPod); err != nil {
					return err
				}
				podToCreateEachTask = append(podToCreateEachTask, newPod)
				waitCreationGroup.Add(1) //后面有多个协程一起创建pod
			} else {
				delete(pods, podName)  //对scale down场景,pods中多余的哪些在后面会被删除,这里删掉的pod代表不需要被删除
				if pod.DeletionTimestamp != nil {  //比如从2个副本->1个,那么2号pod在后面会进入podToDelete,如此时如果1号pod被删了,那么就增加一个terminating的数值
					klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
					atomic.AddInt32(&terminating, 1)
					continue
				}
					//对已有的pod进行phase判断
				classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
				calcPodStatus(pod, taskStatusCount)  //然后根据pod phase的技术累积到taskStatusCount这个map
			}
		}
		podToCreate[ts.Name] = podToCreateEachTask
		for _, pod := range pods {
			podToDelete = append(podToDelete, pod)   //scale down剩下的pod要被删除
		}
	}

	for taskName, podToCreateEachTask := range podToCreate {
		if len(podToCreateEachTask) == 0 {
			continue
		}
		go func(taskName string, podToCreateEachTask []*v1.Pod) {  //每个task一个协程
			taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)
			if job.Spec.Tasks[taskIndex].DependsOn != nil { //处理有task依赖的情况
				cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job)
			}

			for _, pod := range podToCreateEachTask {
				go func(pod *v1.Pod) {
					defer waitCreationGroup.Done()
//然后每个pod一个协程
					newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
					if err != nil && !apierrors.IsAlreadyExists(err) {  //创建失败,而且是已经存在的情况
						// Failed to create Pod, waitCreationGroup a moment and then create it again
						// This is to ensure all podsMap under the same Job created
						// So gang-scheduling could schedule the Job successfully
						klog.Errorf("Failed to create pod %s for Job %s, err %#v",
							pod.Name, job.Name, err)
						appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
					} else {
						classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)
						calcPodStatus(pod, taskStatusCount)
						klog.V(5).Infof("Created Task <%s> of Job <%s/%s>",
							pod.Name, job.Namespace, job.Name)
					}
				}(pod)
			}
		}(taskName, podToCreateEachTask)
	}

	waitCreationGroup.Wait()

	if len(creationErrs) != 0 {  //有任何一个pod创建失败,则退出本次调谐
		cc.recorder.Event(job, v1.EventTypeWarning, FailedCreatePodReason,
			fmt.Sprintf("Error creating pods: %+v", creationErrs))
		return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
	}

	// Delete pods when scale down.
	waitDeletionGroup := sync.WaitGroup{}
	waitDeletionGroup.Add(len(podToDelete))
	for _, pod := range podToDelete {
		go func(pod *v1.Pod) {
			defer waitDeletionGroup.Done()
			err := cc.deleteJobPod(job.Name, pod)  //从apiserver删除pod
			if err != nil {
				// Failed to delete Pod, waitCreationGroup a moment and then create it again
				// This is to ensure all podsMap under the same Job created
				// So gang-scheduling could schedule the Job successfully
				klog.Errorf("Failed to delete pod %s for Job %s, err %#v",
					pod.Name, job.Name, err)
				appendError(&deletionErrs, err)
				cc.resyncTask(pod)
			} else {
				klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
					pod.Name, job.Namespace, job.Name)
				atomic.AddInt32(&terminating, 1)
			}
		}(pod)
	}
	waitDeletionGroup.Wait()

	if len(deletionErrs) != 0 {
		cc.recorder.Event(job, v1.EventTypeWarning, FailedDeletePodReason,
			fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
		return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
	}
	job.Status = batch.JobStatus{
		State: job.Status.State,

		Pending:             pending,
		Running:             running,
		Succeeded:           succeeded,
		Failed:              failed,
		Terminating:         terminating,
		Unknown:             unknown,
		Version:             job.Status.Version,
		MinAvailable:        job.Spec.MinAvailable,
		TaskStatusCount:     taskStatusCount,
		ControlledResources: job.Status.ControlledResources,
		Conditions:          job.Status.Conditions,
		RetryCount:          job.Status.RetryCount,
	}

	if updateStatus != nil {
		if updateStatus(&job.Status) {
			job.Status.State.LastTransitionTime = metav1.Now()
			jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)
			job.Status.Conditions = append(job.Status.Conditions, jobCondition)
		}
	}
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update status of Job %v/%v: %v",
			job.Namespace, job.Name, err)
		return err
	}
	if e := cc.cache.Update(newJob); e != nil {
		klog.Errorf("SyncJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, e)
		return e
	}

	return nil
}

syncJob报错,则会被processNextReq 进行判断是否要重试,需要的话会重新入队

初始化job,通常是job刚刚创建,要赋予其pending状态,以及其他字段的一些默认值,以及创建pg和pvc等

func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
	klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
	jobInstance, err := cc.initJobStatus(job)
	if err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
			fmt.Sprintf("Failed to initialize job status, err: %v", err))
		return nil, err
	}

	if err := cc.pluginOnJobAdd(jobInstance); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
			fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
		return nil, err
	}

	newJob, err := cc.createJobIOIfNotExist(jobInstance)  //这里主要是create pvc, job.spec里面可以写一个pvc的名字,如果不写的话,默认会生成一个 pvc
	if err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PVCError),
			fmt.Sprintf("Failed to create PVC, err: %v", err))
		return nil, err
	}

	if err := cc.createOrUpdatePodGroup(newJob); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
			fmt.Sprintf("Failed to create PodGroup, err: %v", err))
		return nil, err
	}

	return newJob, nil
}

func (cc *jobcontroller) initJobStatus(job *batch.Job) (*batch.Job, error) {
	if job.Status.State.Phase != "" {
		return job, nil
	}

	job.Status.State.Phase = batch.Pending
	job.Status.State.LastTransitionTime = metav1.Now()
	job.Status.MinAvailable = job.Spec.MinAvailable
	jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime) //把pending这个状态放到condition里
	job.Status.Conditions = append(job.Status.Conditions, jobCondition)
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update status of Job %v/%v: %v",
			job.Namespace, job.Name, err)
		return nil, err
	}
	if err := cc.cache.Update(newJob); err != nil {  //更新jobCache缓存
		klog.Errorf("CreateJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, err)
		return nil, err
	}

	return newJob, nil
}


// 如果不是pending job,则更新job, 这里通常对应的场景是,job比如running起来了,外部更新了job spec
func (cc *jobcontroller) initOnJobUpdate(job *batch.Job) error {
	klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name)

	if err := cc.pluginOnJobUpdate(job); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
			fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
		return err
	}

	if err := cc.createOrUpdatePodGroup(job); err != nil {  //创建对应的podgroup
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
			fmt.Sprintf("Failed to create PodGroup, err: %v", err))
		return err
	}

	return nil
}


func (cc *jobcontroller) pluginOnJobUpdate(job *batch.Job) error {
	client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
	if job.Status.ControlledResources == nil {
		job.Status.ControlledResources = make(map[string]string)
	}
	for name, args := range job.Spec.Plugins {
		pb, found := plugins.GetPluginBuilder(name)
		if !found {
			err := fmt.Errorf("failed to get plugin %s", name)
			klog.Error(err)
			return err
		}
		klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
		if err := pb(client, args).OnJobUpdate(job); err != nil {  //对应plugin在job update的时候的处理,有的啥都没做,有的会更新cm
			klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err)
			return err
		}
	}

	return nil
}

//这里会调用到pluginBudiler,而目前支持的5种,都在init函数里注册了
// pkg/controllers/job/plugins/factory.go
func init() {
	RegisterPluginBuilder("ssh", ssh.New)
	RegisterPluginBuilder("env", env.New)
	RegisterPluginBuilder("svc", svc.New)
	RegisterPluginBuilder("tensorflow", tensorflow.New)
	RegisterPluginBuilder("mpi", mpi.New)
}

func RegisterPluginBuilder(name string, pc PluginBuilder) {
	pluginMutex.Lock()
	defer pluginMutex.Unlock()

	pluginBuilders[name] = pc
}

// GetPluginBuilder returns plugin builder for a given plugin name.
func GetPluginBuilder(name string) (PluginBuilder, bool) {
	pluginMutex.Lock()
	defer pluginMutex.Unlock()

	pb, found := pluginBuilders[name]
	return pb, found
}

其实Command这个cmd也是通过外部进行一些action的传入,来影响job,比如RestartJob 之类的,也就是用户通过cmd来影响job的行为。

jobController除了关注Job,也关注了pg的update,会将对应的job入队

也关注了cmd的create,会先入队到cmdQueue,然后再入队到jobqueue

同时也关注了pod,但是关注pod的时候会判断这个pod是否是跟job有关的,会通过一些annotation之类的,和controlledBy来判断,然后也是封装一个req来入队到jobuQueue

其实pod会触发一些job的同步,比如job的增删改,尤其是update,会触发判断task是否完成,然后触发不同的事件,对于deletePod,则触发的是PodEvictedEvent

 

KillJob

从前面的表格中可以看出KillJob主要是删除Job或者异常场景触发的,Job并不支持升级操作,只支持扩缩容,因此一旦遇到异常场景会直接触发KillJob,其主要的代码逻辑为:

  • 删除这个job对应的所有的pod,同时统计各个状态的pod的数量
  • 更新Job的状态
  • 删除Job对应的PodGroup

通过Command可以传入RestartJob或者AbortJob等动作。

比如runnintState 在碰到***RestartJobAction 就会触发KillJob***

func (ps *runningState) Execute(action v1alpha1.Action) error {
	switch action {
	case v1alpha1.RestartJobAction:
		return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
			status.State.Phase = vcbatch.Restarting
			status.RetryCount++
			return true
		})



// pkg/controllers/job/job_controller_actions.go
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
	job := jobInfo.Job
	if job.DeletionTimestamp != nil {
		// 不处理正在删除中的job(可能有其他协程的worker在处理)
		return nil
	}
	var pending, running, terminating, succeeded, failed, unknown int32
	var errs []error
	var total int
    // 删除该job下的所有pod
	for _, pods := range jobInfo.Pods {
		for _, pod := range pods {
			total++
			if pod.DeletionTimestamp != nil {
				// 跳过已经在删除中的pod
				terminating++
				continue
			}
			_, retain := podRetainPhase[pod.Status.Phase]
			if !retain {
                // 调用kube-apiserver的接口删除状态不是Success或者Failed的pod
				err := cc.deleteJobPod(job.Name, pod)
				if err == nil {
					terminating++
					continue
				}
				errs = append(errs, err)
				cc.resyncTask(pod)
			}

			classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)
		}
	}
	// 错误处理略

	// 更新Job状态,将各种状态的pod数量更新到status中
	newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
	// 错误处理略
    // 更新job缓存
	if e := cc.cache.Update(newJob); e != nil {
		klog.Errorf("KillJob - Failed to update Job %v/%v in cache:  %v",
			newJob.Namespace, newJob.Name, e)
		return e
	}
	// 删除这个Job对应的PodGroups
	if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}); err != nil {
		// 错误处理略
	}
	if err := cc.pluginOnJobDelete(job); err != nil {
		return err
	}
	return nil
}

其实最终这个job还会是触发调谐。因为会updateJobStatus

通过更新到其他的status重新触发一个新的状态,然后根据不同的action去触发动作

比如running→restart→pending→syncJob(running)

 

processResyncTask

jobController的另一个主要流程如下:

//syncJob和KillJob中进行pod删除的时候,会将删除失败的pod进行入队
func (cc *jobcontroller) resyncTask(task *v1.Pod) {
	cc.errTasks.AddRateLimited(task)
}

func (cc *jobcontroller) processResyncTask() {
	obj, shutdown := cc.errTasks.Get()
	if shutdown {
		return
	}

	// one task only resync 10 times
	if cc.errTasks.NumRequeues(obj) > 10 {   //addRateLimited会记录一次requeue,失败10次则退出
		cc.errTasks.Forget(obj)
		return
	}

	defer cc.errTasks.Done(obj)

	task, ok := obj.(*v1.Pod)
	if !ok {
		klog.Errorf("failed to convert %v to *v1.Pod", obj)
		return
	}

	if err := cc.syncTask(task); err != nil {
		klog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
		cc.resyncTask(task)  //如果报错,则重新入队
	}
}

func (cc *jobcontroller) syncTask(oldTask *v1.Pod) error {
	newPod, err := cc.kubeClient.CoreV1().Pods(oldTask.Namespace).Get(context.TODO(), oldTask.Name, metav1.GetOptions{})
	if err != nil {
		if errors.IsNotFound(err) {
			if err := cc.cache.DeletePod(oldTask); err != nil {
				klog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err)
				return err
			}
			klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)

			return nil
		}
		return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
	}

	return cc.cache.UpdatePod(newPod)
}

//注意,虽然这里的errTasks是pod删除失败导致的,但是在syncTask中,没有再重试去apiserver删除pod
//因为job本身就会重试删除,所以这里只是update jobCache即可

 

总结

jobController主要监听了4个主要的资源

  1. vcJob的add,update,delete; 这些动作会将Request对象(包含Action,exitCode,Action等)入队,然后worker 协程会取出自己负责的queue中的req,进行处理
    1. 处理的逻辑通常就是syncJob和killJob,根据不同的job状态以及req中判断得来的action进行下一步操作(也就是当前是啥状态,下一步要做啥操作)
    2. syncJob一般就是创建pg,扩缩pod等等
    3. killJob 就是删除pod,然后切换到新的phase
  2. pod的add,update,delete,主要也是触发对应jobcache中对应job的pod信息修改,同样触发job调谐
  3. pg的update,其实就是pg被调度了之后,触发对job的调谐
  4. command的add事件,cmd创建了之后,会放入自己的queue,取出的时候会从api-server删掉cmd,然后将cmd制定的action给封装为req 去调谐job
    1. cmd的用处就是通过vccli来进行干预job的生命周期
  5. 对于删除失败的pod,会进行一个协程重试,但是重试只是从cache中删除,job调谐本身就会删除pod
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0