Controller-manager
// Run the controller. cmd/controller-manager/app/server.go
func Run(opt *options.ServerOption) error {
config, err := kube.BuildConfig(opt.KubeClientOptions)
if err != nil {
return err
}
if opt.EnableHealthz {
if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil {
return err
}
}
job.SetDetectionPeriodOfDependsOntask(opt.DetectionPeriodOfDependsOntask) //检测任务依赖的周期,设置为一个全局变量
run := startControllers(config, opt)
if !opt.EnableLeaderElection {
run(context.TODO())
return fmt.Errorf("finished without leader elect")
}
leaderElectionClient, err := kubeclientset.NewForConfig(rest.AddUserAgent(config, "leader-election"))
if err != nil {
return err
}
// Prepare event clients. 事件广播
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
opt.LockObjectNamespace,
"vc-controller-manager",
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: eventRecorder,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
})
return fmt.Errorf("lost lease")
}
func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
controllerOpt := &framework.ControllerOption{}
controllerOpt.SchedulerNames = opt.SchedulerNames
controllerOpt.WorkerNum = opt.WorkerThreads
controllerOpt.MaxRequeueNum = opt.MaxRequeueNum
// TODO: add user agent for different controllers 把不同的client装入,给不同的控制器使用
controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)
return func(ctx context.Context) {
framework.ForeachController(func(c framework.Controller) {
if err := c.Initialize(controllerOpt); err != nil {
klog.Errorf("Failed to initialize controller <%s>: %v", c.Name(), err)
return
}
go c.Run(ctx.Done())
})
<-ctx.Done()
}
}
// pkg/controllers/framework/factory.go
var controllers = map[string]Controller{} //所有的内置controller 注册到这里
// ForeachController is helper function to operator all controllers.
func ForeachController(fn func(controller Controller)) {
for _, ctrl := range controllers {
fn(ctrl)
}
}
// RegisterController register controller to the controller manager.
func RegisterController(ctrl Controller) error {
if ctrl == nil {
return fmt.Errorf("controller is nil")
}
if _, found := controllers[ctrl.Name()]; found {
return fmt.Errorf("duplicated controller")
}
klog.V(3).Infof("Controller <%s> is registered.", ctrl.Name())
controllers[ctrl.Name()] = ctrl
return nil
}
// RegisterController 被不同的controller 在init函数中调用
// pkg/controllers/job/job_controller.go
func init() {
framework.RegisterController(&jobcontroller{})
}
// 不同controller的init函数,会在controller-manager的main.go文件被import,位于cmd/controller-manager/main.go
gc-controller
以下节选代码进行说明:
// gc controller 的初始化和run函数
func (gc *gccontroller) Initialize(opt *framework.ControllerOption) error {
gc.vcClient = opt.VolcanoClient
jobInformer := informerfactory.NewSharedInformerFactory(gc.vcClient, 0).Batch().V1alpha1().Jobs()
gc.jobInformer = jobInformer
gc.jobLister = jobInformer.Lister()
gc.jobSynced = jobInformer.Informer().HasSynced
gc.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: gc.addJob,
UpdateFunc: gc.updateJob,
})
return nil
}
// gccontroller 其实就是保存了对job的一些Lister和informer
// Run starts the worker to clean up Jobs.
func (gc *gccontroller) Run(stopCh <-chan struct{}) {
defer gc.queue.ShutDown()
klog.Infof("Starting garbage collector")
defer klog.Infof("Shutting down garbage collector")
go gc.jobInformer.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, gc.jobSynced) {
return
}
go wait.Until(gc.worker, time.Second, stopCh)
<-stopCh
}
// gc controller中保存了一个队列,这里面是元素的入队操作
func (gc *gccontroller) addJob(obj interface{}) {
job := obj.(*v1alpha1.Job)
klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
gc.enqueue(job)
}
}
func (gc *gccontroller) updateJob(old, cur interface{}) {
job := cur.(*v1alpha1.Job)
klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
gc.enqueue(job)
}
}
func (gc *gccontroller) enqueue(job *v1alpha1.Job) {
klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
key, err := cache.MetaNamespaceKeyFunc(job)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", job, err)
return
}
gc.queue.Add(key)
}
//enqueueAfter 是加入延迟队列,是发现有哪些job 配置了ttl之后,在ttl到期的时候将其加入queue,从而再ttl到期后快速取出来判断ttl是否过期,然后执行clean
func (gc *gccontroller) enqueueAfter(job *v1alpha1.Job, after time.Duration) {
key, err := cache.MetaNamespaceKeyFunc(job)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", job, err)
return
}
gc.queue.AddAfter(key, after)
}
func (gc *gccontroller) worker() {
for gc.processNextWorkItem() {
}
}
func (gc *gccontroller) processNextWorkItem() bool {
key, quit := gc.queue.Get()
if quit {
return false
}
defer gc.queue.Done(key)
err := gc.processJob(key.(string))
gc.handleErr(err, key)
return true
}
func (gc *gccontroller) handleErr(err error, key interface{}) {
if err == nil {
gc.queue.Forget(key)
return
}
klog.Errorf("error cleaning up Job %v, will retry: %v", key, err)
gc.queue.AddRateLimited(key) //如果有错误,则进行限速后继续加入queue,进行backoff处理
}
// processJob will check the Job's state and TTL and delete the Job when it
// finishes and its TTL after finished has expired. If the Job hasn't finished or
// its TTL hasn't expired, it will be added to the queue after the TTL is expected
// to expire.
// This function is not meant to be invoked concurrently with the same key.
func (gc *gccontroller) processJob(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
job, err := gc.jobLister.Jobs(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
//判断是否job已经过期,这里是从缓存里拿的
if expired, err := gc.processTTL(job); err != nil {
return err
} else if !expired {
return nil
}
// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
// Before deleting the Job, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
// The latest Job may have a different UID, but it's fine because the checks will be run again.
//要避免误删除的极端情况,即出现并发修改,将job ttl改大了,那么就重新获取一次最新的ttl
fresh, err := gc.vcClient.BatchV1alpha1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
if expired, err := gc.processTTL(fresh); err != nil {
return err
} else if !expired {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
policy := metav1.DeletePropagationForeground //forceground删除策略
options := metav1.DeleteOptions{
PropagationPolicy: &policy,
Preconditions: &metav1.Preconditions{UID: &fresh.UID}, //必须uid相同才删除
}
//DeleteOptions中的Preconditions是一个可选字段,用于指定删除资源时的前提条件。如果指定了Preconditions,则只有当资源的当前状态与Preconditions中指定的状态匹配时,才会执行删除操作。否则,删除操作将被拒绝。
klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
return gc.vcClient.BatchV1alpha1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
}
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (gc *gccontroller) processTTL(job *v1alpha1.Job) (expired bool, err error) {
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
if job.DeletionTimestamp != nil || !needsCleanup(job) { //如果job没有被删除,并且配置了ttl,并且job 是完成状态 才能执行gc
return false, nil
}
now := time.Now()
t, err := timeLeft(job, &now) //计算ttl之后的时间(finishTime+ttl - now) 来判断是否过期
if err != nil {
return false, err
}
// TTL has expired 返回的时间Duration < 0,代表ttl的时间 小于now
if *t <= 0 { //
return true, nil
}
gc.enqueueAfter(job, *t) //如果当前时间,没有到达ttl,则在ttl之后进行入队,到时候再判断一下是否要gc
return false, nil
}
// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *v1alpha1.Job) bool {
return j.Spec.TTLSecondsAfterFinished != nil && isJobFinished(j)
}
func isJobFinished(job *v1alpha1.Job) bool {
return job.Status.State.Phase == v1alpha1.Completed ||
job.Status.State.Phase == v1alpha1.Failed ||
job.Status.State.Phase == v1alpha1.Terminated
}
func timeLeft(j *v1alpha1.Job, since *time.Time) (*time.Duration, error) {
finishAt, expireAt, err := getFinishAndExpireTime(j)
if err != nil {
return nil, err
}
if finishAt.UTC().After(since.UTC()) {
klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
}
remaining := expireAt.UTC().Sub(since.UTC()) //看看ttl之后的时间 - 当前时间剩多少,如果是正的,则不需要clean
klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
return &remaining, nil
}
func getFinishAndExpireTime(j *v1alpha1.Job) (*time.Time, *time.Time, error) {
if !needsCleanup(j) {
return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
}
finishAt, err := jobFinishTime(j) //从job中的status判断完成时间
if err != nil {
return nil, nil, err
}
finishAtUTC := finishAt.UTC()
expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAtUTC, &expireAtUTC, nil //返回完成时间和加上ttl之后的过期时间
}
// jobFinishTime takes an already finished Job and returns the time it finishes.
// 获取job完成的时间,其实就是用 Job.status.state这个状态的最后切换时间来判断,因为job已经完成了,所以最后一个tranmition时间就是完成的时间
func jobFinishTime(finishedJob *v1alpha1.Job) (metav1.Time, error) {
if finishedJob.Status.State.LastTransitionTime.IsZero() {
return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
}
return finishedJob.Status.State.LastTransitionTime, nil
}