searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

k8s调度器Volcano源码解读(1)

2023-08-29 02:55:07
73
0

Controller-manager

// Run the controller.  cmd/controller-manager/app/server.go
func Run(opt *options.ServerOption) error {
	config, err := kube.BuildConfig(opt.KubeClientOptions)
	if err != nil {
		return err
	}

	if opt.EnableHealthz {
		if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil {
			return err
		}
	}

	job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask)  //检测任务依赖的周期,设置为一个全局变量

	run := startControllers(config, opt)

	if !opt.EnableLeaderElection {
		run(context.TODO())
		return fmt.Errorf("finished without leader elect")
	}

	leaderElectionClient, err := kubeclientset.NewForConfig(rest.AddUserAgent(config, "leader-election"))
	if err != nil {
		return err
	}

	// Prepare event clients.  事件广播
	broadcaster := record.NewBroadcaster()
	broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
	eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

	hostname, err := os.Hostname()
	if err != nil {
		return fmt.Errorf("unable to get hostname: %v", err)
	}
	// add a uniquifier so that two processes on the same host don't accidentally both become active
	id := hostname + "_" + string(uuid.NewUUID())

	rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
		opt.LockObjectNamespace,
		"vc-controller-manager",
		leaderElectionClient.CoreV1(),
		leaderElectionClient.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: eventRecorder,
		})
	if err != nil {
		return fmt.Errorf("couldn't create resource lock: %v", err)
	}

	leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: leaseDuration,
		RenewDeadline: renewDeadline,
		RetryPeriod:   retryPeriod,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		},
	})
	return fmt.Errorf("lost lease")
}

func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
	controllerOpt := &framework.ControllerOption{}

	controllerOpt.SchedulerNames = opt.SchedulerNames
	controllerOpt.WorkerNum = opt.WorkerThreads
	controllerOpt.MaxRequeueNum = opt.MaxRequeueNum

	// TODO: add user agent for different controllers 把不同的client装入,给不同的控制器使用
	controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
	controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
	controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)

	return func(ctx context.Context) {
		framework.ForeachController(func(c framework.Controller) {
			if err := c.Initialize(controllerOpt); err != nil {
				klog.Errorf("Failed to initialize controller <%s>: %v", c.Name(), err)
				return
			}

			go c.Run(ctx.Done())
		})

		<-ctx.Done()
	}
}
// pkg/controllers/framework/factory.go
var controllers = map[string]Controller{}  //所有的内置controller 注册到这里

// ForeachController is helper function to operator all controllers.
func ForeachController(fn func(controller Controller)) {
	for _, ctrl := range controllers {
		fn(ctrl)
	}
}

// RegisterController register controller to the controller manager.
func RegisterController(ctrl Controller) error {
	if ctrl == nil {
		return fmt.Errorf("controller is nil")
	}

	if _, found := controllers[ctrl.Name()]; found {
		return fmt.Errorf("duplicated controller")
	}

	klog.V(3).Infof("Controller <%s> is registered.", ctrl.Name())
	controllers[ctrl.Name()] = ctrl
	return nil
}


// RegisterController 被不同的controller 在init函数中调用
// pkg/controllers/job/job_controller.go
func init() {
	framework.RegisterController(&jobcontroller{})
}


// 不同controller的init函数,会在controller-manager的main.go文件被import,位于cmd/controller-manager/main.go

 

gc-controller

以下节选代码进行说明:

// gc controller 的初始化和run函数
func (gc *gccontroller) Initialize(opt *framework.ControllerOption) error {
	gc.vcClient = opt.VolcanoClient
	jobInformer := informerfactory.NewSharedInformerFactory(gc.vcClient, 0).Batch().V1alpha1().Jobs()

	gc.jobInformer = jobInformer
	gc.jobLister = jobInformer.Lister()
	gc.jobSynced = jobInformer.Informer().HasSynced
	gc.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    gc.addJob,
		UpdateFunc: gc.updateJob,
	})

	return nil
}
// gccontroller 其实就是保存了对job的一些Lister和informer

// Run starts the worker to clean up Jobs.
func (gc *gccontroller) Run(stopCh <-chan struct{}) {
	defer gc.queue.ShutDown()

	klog.Infof("Starting garbage collector")
	defer klog.Infof("Shutting down garbage collector")

	go gc.jobInformer.Informer().Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, gc.jobSynced) {
		return
	}

	go wait.Until(gc.worker, time.Second, stopCh)

	<-stopCh
}
// gc controller中保存了一个队列,这里面是元素的入队操作
func (gc *gccontroller) addJob(obj interface{}) {
	job := obj.(*v1alpha1.Job)
	klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)

	if job.DeletionTimestamp == nil && needsCleanup(job) {
		gc.enqueue(job)
	}
}

func (gc *gccontroller) updateJob(old, cur interface{}) {
	job := cur.(*v1alpha1.Job)
	klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)

	if job.DeletionTimestamp == nil && needsCleanup(job) {
		gc.enqueue(job)
	}
}

func (gc *gccontroller) enqueue(job *v1alpha1.Job) {
	klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
	key, err := cache.MetaNamespaceKeyFunc(job)
	if err != nil {
		klog.Errorf("couldn't get key for object %#v: %v", job, err)
		return
	}

	gc.queue.Add(key)
}

//enqueueAfter 是加入延迟队列,是发现有哪些job 配置了ttl之后,在ttl到期的时候将其加入queue,从而再ttl到期后快速取出来判断ttl是否过期,然后执行clean
func (gc *gccontroller) enqueueAfter(job *v1alpha1.Job, after time.Duration) {
	key, err := cache.MetaNamespaceKeyFunc(job)
	if err != nil {
		klog.Errorf("couldn't get key for object %#v: %v", job, err)
		return
	}

	gc.queue.AddAfter(key, after)
}

 

func (gc *gccontroller) worker() {
	for gc.processNextWorkItem() {
	}
}

func (gc *gccontroller) processNextWorkItem() bool {
	key, quit := gc.queue.Get()
	if quit {
		return false
	}
	defer gc.queue.Done(key)

	err := gc.processJob(key.(string))
	gc.handleErr(err, key)

	return true
}

func (gc *gccontroller) handleErr(err error, key interface{}) {
	if err == nil {
		gc.queue.Forget(key)
		return
	}

	klog.Errorf("error cleaning up Job %v, will retry: %v", key, err)
	gc.queue.AddRateLimited(key)  //如果有错误,则进行限速后继续加入queue,进行backoff处理
}

// processJob will check the Job's state and TTL and delete the Job when it
// finishes and its TTL after finished has expired. If the Job hasn't finished or
// its TTL hasn't expired, it will be added to the queue after the TTL is expected
// to expire.
// This function is not meant to be invoked concurrently with the same key.
func (gc *gccontroller) processJob(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
	// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
	job, err := gc.jobLister.Jobs(namespace).Get(name)
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}

//判断是否job已经过期,这里是从缓存里拿的
	if expired, err := gc.processTTL(job); err != nil {
		return err
	} else if !expired {
		return nil
	}

	// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
	// Before deleting the Job, do a final sanity check.
	// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
	// The latest Job may have a different UID, but it's fine because the checks will be run again.
//要避免误删除的极端情况,即出现并发修改,将job ttl改大了,那么就重新获取一次最新的ttl 
	fresh, err := gc.vcClient.BatchV1alpha1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}
	// Use the latest Job TTL to see if the TTL truly expires.
	if expired, err := gc.processTTL(fresh); err != nil {
		return err
	} else if !expired {
		return nil
	}
	// Cascade deletes the Jobs if TTL truly expires.
	policy := metav1.DeletePropagationForeground  //forceground删除策略
	options := metav1.DeleteOptions{
		PropagationPolicy: &policy,
		Preconditions:     &metav1.Preconditions{UID: &fresh.UID},  //必须uid相同才删除
	}
//DeleteOptions中的Preconditions是一个可选字段,用于指定删除资源时的前提条件。如果指定了Preconditions,则只有当资源的当前状态与Preconditions中指定的状态匹配时,才会执行删除操作。否则,删除操作将被拒绝。
	klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
	return gc.vcClient.BatchV1alpha1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
}

// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (gc *gccontroller) processTTL(job *v1alpha1.Job) (expired bool, err error) {
	// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
	if job.DeletionTimestamp != nil || !needsCleanup(job) {  //如果job没有被删除,并且配置了ttl,并且job 是完成状态 才能执行gc
		return false, nil
	}

	now := time.Now()
	t, err := timeLeft(job, &now)  //计算ttl之后的时间(finishTime+ttl - now) 来判断是否过期
	if err != nil {
		return false, err
	}

	// TTL has expired  返回的时间Duration < 0,代表ttl的时间 小于now
	if *t <= 0 {  //
		return true, nil
	}

	gc.enqueueAfter(job, *t)  //如果当前时间,没有到达ttl,则在ttl之后进行入队,到时候再判断一下是否要gc
	return false, nil
}

// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *v1alpha1.Job) bool {
	return j.Spec.TTLSecondsAfterFinished != nil && isJobFinished(j)
}

func isJobFinished(job *v1alpha1.Job) bool {
	return job.Status.State.Phase == v1alpha1.Completed ||
		job.Status.State.Phase == v1alpha1.Failed ||
		job.Status.State.Phase == v1alpha1.Terminated
}

 

func timeLeft(j *v1alpha1.Job, since *time.Time) (*time.Duration, error) {
	finishAt, expireAt, err := getFinishAndExpireTime(j)
	if err != nil {
		return nil, err
	}
	if finishAt.UTC().After(since.UTC()) {
		klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
	}
	remaining := expireAt.UTC().Sub(since.UTC())  //看看ttl之后的时间 - 当前时间剩多少,如果是正的,则不需要clean
	klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
	return &remaining, nil
}


func getFinishAndExpireTime(j *v1alpha1.Job) (*time.Time, *time.Time, error) {
	if !needsCleanup(j) {
		return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
	}
	finishAt, err := jobFinishTime(j)  //从job中的status判断完成时间
	if err != nil {
		return nil, nil, err
	}
	finishAtUTC := finishAt.UTC()
	expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
	return &finishAtUTC, &expireAtUTC, nil   //返回完成时间和加上ttl之后的过期时间
}


// jobFinishTime takes an already finished Job and returns the time it finishes.
// 获取job完成的时间,其实就是用 Job.status.state这个状态的最后切换时间来判断,因为job已经完成了,所以最后一个tranmition时间就是完成的时间
func jobFinishTime(finishedJob *v1alpha1.Job) (metav1.Time, error) {
	if finishedJob.Status.State.LastTransitionTime.IsZero() {
		return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
	}
	return finishedJob.Status.State.LastTransitionTime, nil
}
0条评论
0 / 1000
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
原创

k8s调度器Volcano源码解读(1)

2023-08-29 02:55:07
73
0

Controller-manager

// Run the controller.  cmd/controller-manager/app/server.go
func Run(opt *options.ServerOption) error {
	config, err := kube.BuildConfig(opt.KubeClientOptions)
	if err != nil {
		return err
	}

	if opt.EnableHealthz {
		if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil {
			return err
		}
	}

	job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask)  //检测任务依赖的周期,设置为一个全局变量

	run := startControllers(config, opt)

	if !opt.EnableLeaderElection {
		run(context.TODO())
		return fmt.Errorf("finished without leader elect")
	}

	leaderElectionClient, err := kubeclientset.NewForConfig(rest.AddUserAgent(config, "leader-election"))
	if err != nil {
		return err
	}

	// Prepare event clients.  事件广播
	broadcaster := record.NewBroadcaster()
	broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
	eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

	hostname, err := os.Hostname()
	if err != nil {
		return fmt.Errorf("unable to get hostname: %v", err)
	}
	// add a uniquifier so that two processes on the same host don't accidentally both become active
	id := hostname + "_" + string(uuid.NewUUID())

	rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
		opt.LockObjectNamespace,
		"vc-controller-manager",
		leaderElectionClient.CoreV1(),
		leaderElectionClient.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: eventRecorder,
		})
	if err != nil {
		return fmt.Errorf("couldn't create resource lock: %v", err)
	}

	leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: leaseDuration,
		RenewDeadline: renewDeadline,
		RetryPeriod:   retryPeriod,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		},
	})
	return fmt.Errorf("lost lease")
}

func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
	controllerOpt := &framework.ControllerOption{}

	controllerOpt.SchedulerNames = opt.SchedulerNames
	controllerOpt.WorkerNum = opt.WorkerThreads
	controllerOpt.MaxRequeueNum = opt.MaxRequeueNum

	// TODO: add user agent for different controllers 把不同的client装入,给不同的控制器使用
	controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
	controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
	controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)

	return func(ctx context.Context) {
		framework.ForeachController(func(c framework.Controller) {
			if err := c.Initialize(controllerOpt); err != nil {
				klog.Errorf("Failed to initialize controller <%s>: %v", c.Name(), err)
				return
			}

			go c.Run(ctx.Done())
		})

		<-ctx.Done()
	}
}
// pkg/controllers/framework/factory.go
var controllers = map[string]Controller{}  //所有的内置controller 注册到这里

// ForeachController is helper function to operator all controllers.
func ForeachController(fn func(controller Controller)) {
	for _, ctrl := range controllers {
		fn(ctrl)
	}
}

// RegisterController register controller to the controller manager.
func RegisterController(ctrl Controller) error {
	if ctrl == nil {
		return fmt.Errorf("controller is nil")
	}

	if _, found := controllers[ctrl.Name()]; found {
		return fmt.Errorf("duplicated controller")
	}

	klog.V(3).Infof("Controller <%s> is registered.", ctrl.Name())
	controllers[ctrl.Name()] = ctrl
	return nil
}


// RegisterController 被不同的controller 在init函数中调用
// pkg/controllers/job/job_controller.go
func init() {
	framework.RegisterController(&jobcontroller{})
}


// 不同controller的init函数,会在controller-manager的main.go文件被import,位于cmd/controller-manager/main.go

 

gc-controller

以下节选代码进行说明:

// gc controller 的初始化和run函数
func (gc *gccontroller) Initialize(opt *framework.ControllerOption) error {
	gc.vcClient = opt.VolcanoClient
	jobInformer := informerfactory.NewSharedInformerFactory(gc.vcClient, 0).Batch().V1alpha1().Jobs()

	gc.jobInformer = jobInformer
	gc.jobLister = jobInformer.Lister()
	gc.jobSynced = jobInformer.Informer().HasSynced
	gc.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    gc.addJob,
		UpdateFunc: gc.updateJob,
	})

	return nil
}
// gccontroller 其实就是保存了对job的一些Lister和informer

// Run starts the worker to clean up Jobs.
func (gc *gccontroller) Run(stopCh <-chan struct{}) {
	defer gc.queue.ShutDown()

	klog.Infof("Starting garbage collector")
	defer klog.Infof("Shutting down garbage collector")

	go gc.jobInformer.Informer().Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, gc.jobSynced) {
		return
	}

	go wait.Until(gc.worker, time.Second, stopCh)

	<-stopCh
}
// gc controller中保存了一个队列,这里面是元素的入队操作
func (gc *gccontroller) addJob(obj interface{}) {
	job := obj.(*v1alpha1.Job)
	klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)

	if job.DeletionTimestamp == nil && needsCleanup(job) {
		gc.enqueue(job)
	}
}

func (gc *gccontroller) updateJob(old, cur interface{}) {
	job := cur.(*v1alpha1.Job)
	klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)

	if job.DeletionTimestamp == nil && needsCleanup(job) {
		gc.enqueue(job)
	}
}

func (gc *gccontroller) enqueue(job *v1alpha1.Job) {
	klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
	key, err := cache.MetaNamespaceKeyFunc(job)
	if err != nil {
		klog.Errorf("couldn't get key for object %#v: %v", job, err)
		return
	}

	gc.queue.Add(key)
}

//enqueueAfter 是加入延迟队列,是发现有哪些job 配置了ttl之后,在ttl到期的时候将其加入queue,从而再ttl到期后快速取出来判断ttl是否过期,然后执行clean
func (gc *gccontroller) enqueueAfter(job *v1alpha1.Job, after time.Duration) {
	key, err := cache.MetaNamespaceKeyFunc(job)
	if err != nil {
		klog.Errorf("couldn't get key for object %#v: %v", job, err)
		return
	}

	gc.queue.AddAfter(key, after)
}

 

func (gc *gccontroller) worker() {
	for gc.processNextWorkItem() {
	}
}

func (gc *gccontroller) processNextWorkItem() bool {
	key, quit := gc.queue.Get()
	if quit {
		return false
	}
	defer gc.queue.Done(key)

	err := gc.processJob(key.(string))
	gc.handleErr(err, key)

	return true
}

func (gc *gccontroller) handleErr(err error, key interface{}) {
	if err == nil {
		gc.queue.Forget(key)
		return
	}

	klog.Errorf("error cleaning up Job %v, will retry: %v", key, err)
	gc.queue.AddRateLimited(key)  //如果有错误,则进行限速后继续加入queue,进行backoff处理
}

// processJob will check the Job's state and TTL and delete the Job when it
// finishes and its TTL after finished has expired. If the Job hasn't finished or
// its TTL hasn't expired, it will be added to the queue after the TTL is expected
// to expire.
// This function is not meant to be invoked concurrently with the same key.
func (gc *gccontroller) processJob(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
	// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
	job, err := gc.jobLister.Jobs(namespace).Get(name)
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}

//判断是否job已经过期,这里是从缓存里拿的
	if expired, err := gc.processTTL(job); err != nil {
		return err
	} else if !expired {
		return nil
	}

	// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
	// Before deleting the Job, do a final sanity check.
	// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
	// The latest Job may have a different UID, but it's fine because the checks will be run again.
//要避免误删除的极端情况,即出现并发修改,将job ttl改大了,那么就重新获取一次最新的ttl 
	fresh, err := gc.vcClient.BatchV1alpha1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}
	// Use the latest Job TTL to see if the TTL truly expires.
	if expired, err := gc.processTTL(fresh); err != nil {
		return err
	} else if !expired {
		return nil
	}
	// Cascade deletes the Jobs if TTL truly expires.
	policy := metav1.DeletePropagationForeground  //forceground删除策略
	options := metav1.DeleteOptions{
		PropagationPolicy: &policy,
		Preconditions:     &metav1.Preconditions{UID: &fresh.UID},  //必须uid相同才删除
	}
//DeleteOptions中的Preconditions是一个可选字段,用于指定删除资源时的前提条件。如果指定了Preconditions,则只有当资源的当前状态与Preconditions中指定的状态匹配时,才会执行删除操作。否则,删除操作将被拒绝。
	klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
	return gc.vcClient.BatchV1alpha1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
}

// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (gc *gccontroller) processTTL(job *v1alpha1.Job) (expired bool, err error) {
	// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
	if job.DeletionTimestamp != nil || !needsCleanup(job) {  //如果job没有被删除,并且配置了ttl,并且job 是完成状态 才能执行gc
		return false, nil
	}

	now := time.Now()
	t, err := timeLeft(job, &now)  //计算ttl之后的时间(finishTime+ttl - now) 来判断是否过期
	if err != nil {
		return false, err
	}

	// TTL has expired  返回的时间Duration < 0,代表ttl的时间 小于now
	if *t <= 0 {  //
		return true, nil
	}

	gc.enqueueAfter(job, *t)  //如果当前时间,没有到达ttl,则在ttl之后进行入队,到时候再判断一下是否要gc
	return false, nil
}

// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *v1alpha1.Job) bool {
	return j.Spec.TTLSecondsAfterFinished != nil && isJobFinished(j)
}

func isJobFinished(job *v1alpha1.Job) bool {
	return job.Status.State.Phase == v1alpha1.Completed ||
		job.Status.State.Phase == v1alpha1.Failed ||
		job.Status.State.Phase == v1alpha1.Terminated
}

 

func timeLeft(j *v1alpha1.Job, since *time.Time) (*time.Duration, error) {
	finishAt, expireAt, err := getFinishAndExpireTime(j)
	if err != nil {
		return nil, err
	}
	if finishAt.UTC().After(since.UTC()) {
		klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
	}
	remaining := expireAt.UTC().Sub(since.UTC())  //看看ttl之后的时间 - 当前时间剩多少,如果是正的,则不需要clean
	klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
	return &remaining, nil
}


func getFinishAndExpireTime(j *v1alpha1.Job) (*time.Time, *time.Time, error) {
	if !needsCleanup(j) {
		return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
	}
	finishAt, err := jobFinishTime(j)  //从job中的status判断完成时间
	if err != nil {
		return nil, nil, err
	}
	finishAtUTC := finishAt.UTC()
	expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
	return &finishAtUTC, &expireAtUTC, nil   //返回完成时间和加上ttl之后的过期时间
}


// jobFinishTime takes an already finished Job and returns the time it finishes.
// 获取job完成的时间,其实就是用 Job.status.state这个状态的最后切换时间来判断,因为job已经完成了,所以最后一个tranmition时间就是完成的时间
func jobFinishTime(finishedJob *v1alpha1.Job) (metav1.Time, error) {
	if finishedJob.Status.State.LastTransitionTime.IsZero() {
		return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
	}
	return finishedJob.Status.State.LastTransitionTime, nil
}
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0