爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      Kubernetes基于leaderelection选举策略实现组件高可用

      首页 知识中心 其他 文章详情页

      Kubernetes基于leaderelection选举策略实现组件高可用

      2024-11-21 09:56:12 阅读次数:24

      id,组件

      1、概述

      在Kubernetes中,为了实现组件高可用,同一个组件需要部署多个副本,例如多个apiserver、scheduler、controller-manager等,其中apiserver是无状态的,每个组件都可以工作,而scheduler与controller-manager是有状态的,同一时刻只能存在一个活跃的,需要进行选主。

      Kubernetes中是通过leaderelection来实现组件的高可用的。在Kubernetes本身的组件中,kube-scheduler和kube-manager-controller两个组件是有leader选举的,这个选举机制是Kubernetes对于这两个组件的高可用保障。即正常情况下kube-scheduler或kube-manager-controller组件的多个副本只有一个是处于业务逻辑运行状态,其它副本则不断的尝试去获取锁,去竞争leader,直到自己成为leader。如果正在运行的leader因某种原因导致当前进程退出,或者锁丢失,则由其它副本去竞争新的leader,获取leader继而执行业务逻辑。

      不光是Kubernetes本身组件用到了这个选举策略,我们自己定义的服务同样可以用这个算法去实现选主。在Kubernetes client-go包中就提供了接口供用户使用。代码路径在client-go/tools/leaderelection下。

      2、leaderelection使用示例

      以下是一个简单使用的例子(例子来源于client-go中的example包中),编译完成之后同时启动多个进程,但是只有一个进程在工作,当把leader进程kill掉之后,会重新选举出一个leader进行工作,即执行其中的 run 方法:

      //代码路径:client-go/examples/leader-election/main.go
      
      package main
      
      import (
      	"context"
      	"flag"
      	"os"
      	"os/signal"
      	"syscall"
      	"time"
      
      	"/google/uuid"
      	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
      	clientset "k8s.io/client-go/kubernetes"
      	"k8s.io/client-go/rest"
      	"k8s.io/client-go/tools/clientcmd"
      	"k8s.io/client-go/tools/leaderelection"
      	"k8s.io/client-go/tools/leaderelection/resourcelock"
      	"k8s.io/klog/v2"
      )
      
      func buildConfig(kubeconfig string) (*rest.Config, error) {
      	if kubeconfig != "" {
      		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
      		if err != nil {
      			return nil, err
      		}
      		return cfg, nil
      	}
      
      	cfg, err := rest.InClusterConfig()
      	if err != nil {
      		return nil, err
      	}
      	return cfg, nil
      }
      
      func main() {
      	klog.InitFlags(nil)
      
      	var kubeconfig string
      	var leaseLockName string
      	var leaseLockNamespace string
      	var id string
      
      	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
      	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
      	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
      	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
      	flag.Parse()
      
      	if leaseLockName == "" {
      		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
      	}
      	if leaseLockNamespace == "" {
      		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
      	}
      
      	// leader election uses the Kubernetes API by writing to a
      	// lock object, which can be a LeaseLock object (preferred),
      	// a ConfigMap, or an Endpoints (deprecated) object.
      	// Conflicting writes are detected and each client handles those actions
      	// independently.
      	config, err := buildConfig(kubeconfig)
      	if err != nil {
      		klog.Fatal(err)
      	}
      
      	client := clientset.NewForConfigOrDie(config)
      
      	//业务逻辑
      	run := func(ctx context.Context) {
      		// complete your controller loop here
      		klog.Info("Controller loop...")
      
      		select {}
      	}
      
      	// use a Go context so we can tell the leaderelection code when we
      	// want to step down
      	ctx, cancel := context.WithCancel(context.Background())
      	defer cancel()
      
      	// listen for interrupts or the Linux SIGTERM signal and cancel
      	// our context, which the leader election code will observe and
      	// step down
      	ch := make(chan os.Signal, 1)
      	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
      	go func() {
      		<-ch
      		klog.Info("Received termination, signaling shutdown")
      		cancel()
      	}()
      
      	// we use the Lease lock type since edits to Leases are less common
      	// and fewer objects in the cluster watch "all Leases".
      	// 指定锁的资源对象,这里使用了Lease资源,还支持configmap,endpoint,或者multilock(即多种配合使用)
      	lock := &resourcelock.LeaseLock{
      		LeaseMeta: metav1.ObjectMeta{
      			Name:      leaseLockName,
      			Namespace: leaseLockNamespace,
      		},
      		Client: client.CoordinationV1(),
      		LockConfig: resourcelock.ResourceLockConfig{
      			Identity: id,
      		},
      	}
      
      	// 进行选举
      	// start the leader election code loop
      	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
      		Lock: lock,
      		// IMPORTANT: you MUST ensure that any code you have that
      		// is protected by the lease must terminate **before**
      		// you call cancel. Otherwise, you could have a background
      		// loop still running and another process could
      		// get elected before your background loop finished, violating
      		// the stated goal of the lease.
      		ReleaseOnCancel: true,
      		LeaseDuration:   60 * time.Second, //租约时长,非主候选者用来判断资源锁是否过期
      		RenewDeadline:   15 * time.Second, //leader刷新资源锁超时时间
      		RetryPeriod:     5 * time.Second,  //调用资源锁间隔
      		//回调函数,根据选举不同事件触发
      		Callbacks: leaderelection.LeaderCallbacks{
      			OnStartedLeading: func(ctx context.Context) {
      				//变为leader执行的业务代码
      				// we're notified when we start - this is where you would
      				// usually put your code
      				run(ctx)
      			},
      			OnStoppedLeading: func() {
      				// 进程退出
      				// we can do cleanup here
      				klog.Infof("leader lost: %s", id)
      				os.Exit(0)
      			},
      			OnNewLeader: func(identity string) {
      				//当产生新的leader后执行的方法
      				// we're notified when new leader elected
      				if identity == id {
      					// I just got the lock
      					return
      				}
      				klog.Infof("new leader elected: %s", identity)
      			},
      		},
      	})
      }

      关键启动参数说明:  

      kubeconfig: 指定kubeconfig文件地址
      lease-lock-name:指定lock的名称
      lease-lock-namespace:指定lock的namespace
      id: 例子中提供的区别参数,用于区分实例
      logtostderr:klog提供的参数,指定log输出到控制台
      v: 指定日志输出级别

      2.1 同时启动三个进程:

      启动进程1:  

      go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4

      输出:

      apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
      I0126 13:46:59.753974   35080 leaderelection.go:243] attempting to acquire leader lease default/example...
      I0126 13:47:00.660260   35080 leaderelection.go:253] successfully acquired lease default/example
      I0126 13:47:00.660368   35080 main.go:75] Controller loop...

      这里可以看出来id=1的进程持有锁,并且运行的程序。

      启动进程2:

      go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4

      输出:  

      apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
      I0126 13:47:05.066516   35096 leaderelection.go:243] attempting to acquire leader lease default/example...
      I0126 13:47:05.451887   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:05.451909   35096 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:47:05.451918   35096 main.go:145] new leader elected: 1
      I0126 13:47:14.188160   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:14.188188   35096 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:47:24.929607   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:24.929636   35096 leaderelection.go:248] failed to acquire lease default/example
      .......

      这里可以看出来id=1的进程持有锁,并且运行的程序,而id=2的进程表示无法获取到锁,在不断的进行尝试。

      启动进程3:  

      go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4

      输出: 

      apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4
      I0126 13:47:12.431518   35112 leaderelection.go:243] attempting to acquire leader lease default/example...
      I0126 13:47:12.776614   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:12.776649   35112 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:47:12.776663   35112 main.go:145] new leader elected: 1
      I0126 13:47:21.499295   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:21.499325   35112 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:47:32.241544   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:47:32.241572   35112 leaderelection.go:248] failed to acquire lease default/example
      .......

      这里可以看出来id=1的进程持有锁,并且运行的程序,而id=3的进程表示无法获取到锁,在不断的进行尝试。

      2.2 停掉进程1并观察进程2和进程3竞争新的leader

      apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
      I0126 13:46:59.753974   35080 leaderelection.go:243] attempting to acquire leader lease default/example...
      I0126 13:47:00.660260   35080 leaderelection.go:253] successfully acquired lease default/example
      I0126 13:47:00.660368   35080 main.go:75] Controller loop...
      ^CI0126 13:53:16.629114   35080 main.go:92] Received termination, signaling shutdown
      I0126 13:53:17.057999   35080 main.go:135] leader lost: 1

      现在kill掉id=1进程,在等待lock释放之后(有个LeaseDuration时间),观察进程2和进程3的输出,看哪个进程成为新的leader。   

      id=2的进程输出:

      ......
      I0126 13:53:11.208487   35096 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:53:11.208512   35096 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:53:18.189514   35096 leaderelection.go:253] successfully acquired lease default/example
      I0126 13:53:18.189587   35096 main.go:75] Controller loop...

      这里可以看出来id=2的进程持有锁,并且运行的程序。

      id=3的进程输出:

      ......
      I0126 13:53:04.675216   35112 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:53:12.918706   35112 leaderelection.go:346] lock is held by 1 and has not yet expired
      I0126 13:53:12.918736   35112 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:53:19.544314   35112 leaderelection.go:346] lock is held by 2 and has not yet expired
      I0126 13:53:19.544372   35112 leaderelection.go:248] failed to acquire lease default/example
      I0126 13:53:19.544387   35112 main.go:145] new leader elected: 2
      I0126 13:53:26.346561   35112 leaderelection.go:346] lock is held by 2 and has not yet expired
      I0126 13:53:26.346591   35112 leaderelection.go:248] failed to acquire lease default/example
      ......

      这里可以看出来id=2的进程持有锁,并且运行的程序,而id=3的进程表示无法获取到锁,在不断的进行尝试。

      2.3 查看资源锁对象 

      [root@master1 ~]# kubectl get leases.coordination.k8s.io example -o yaml
      apiVersion: coordination.k8s.io/v1
      kind: Lease
      metadata:
        creationTimestamp: "2022-01-26T05:46:38Z"
        managedFields:
        .......
          manager: main
          operation: Update
          time: "2022-01-26T06:05:43Z"
        name: example
        namespace: default
        resourceVersion: "314956587"
        selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example
        uid: 5ce63489-c754-42b4-9e6c-a0a0a8442c3f
      spec:
        acquireTime: "2022-01-26T05:53:17.905076Z" //获得锁时间
        holderIdentity: "2" //持有锁进程的标识
        leaseDurationSeconds: 60 //lease租约时长
        leaseTransitions: 1 //leader更换次数
        renewTime: "2022-01-26T06:06:06.248393Z" //更新租约的时间

      锁已经被进程2获得, 此时如果进程1再启动的话, 也只能一直尝试获取锁。

      3、leaderelection源码分析

      leaderelection基本原理其实就是利用通过Kubernetes中lease、configmap 、endpoints资源实现一个分布式锁,获取到锁的进程成为leader,并且定期更新租约(renew)。其他进程也在不断的尝试进行抢占,抢占不到则继续等待下次循环。当leader节点挂掉之后,租约到期,其他节点就成为新的leader。

      代码路径在client-go/tools/leaderelection下.逻辑结构如下图:

      Kubernetes基于leaderelection选举策略实现组件高可用

      注意: 请注意client-go的版本,不同版本对应LeaderElection的逻辑架构图也略微有所不同。

      3.1、Interface接口

      Interface: 中定义了一系列方法, 包括增加、修改、获取一个LeaderElectionRecord, 说白了就是一个客户端, 而且每个客户端实例都要有自己分布式唯一的id。

      // tools/leaderelection/resourcelock/interface.go
       
      // 资源占有者的描述信息
      type LeaderElectionRecord struct {
          // 持有锁进程的标识 也就是leader的id
          HolderIdentity       string      `json:"holderIdentity"`
          // 一个租约多长时间
          LeaseDurationSeconds int         `json:"leaseDurationSeconds"`
          // 获得leader的时间
          AcquireTime          metav1.Time `json:"acquireTime"`
          // 续约的时间
          RenewTime            metav1.Time `json:"renewTime"`
          // leader变更的次数
          LeaderTransitions    int         `json:"leaderTransitions"`
      }
       
      type Interface interface {
          // 返回当前资源LeaderElectionRecord 
          Get() (*LeaderElectionRecord, error)
          // 创建一个资源LeaderElectionRecord
          Create(ler LeaderElectionRecord) error
          // 更新资源
          Update(ler LeaderElectionRecord) error
          // 记录事件
          RecordEvent(string)
          // 返回当前该应用的id
          Identity() string
          // 描述信息(namespace/name)
          Describe() string
      }

      Interface有四个实现类, 分别为EndpointLock, ConfigMapLock、LeaseLock和MultiLock(一般不用),分别可以操作Kubernetes中的endpoint, configmap和lease。这里以LeaseLock为例子说明。

      // tools/leaderelection/resourcelock/leaselock.go
      
      type LeaseLock struct {
      	// LeaseMeta should contain a Name and a Namespace of a
      	// LeaseMeta object that the LeaderElector will attempt to lead.
      	LeaseMeta  metav1.ObjectMeta
      	// 访问api-server的客户端
      	Client     coordinationv1client.LeasesGetter
      	// 该LeaseLock的分布式唯一身份id
      	LockConfig ResourceLockConfig
      	// 资源锁对应的lease资源对象
      	lease      *coordinationv1.Lease
      }
      
      // tools/leaderelection/resourcelock/interface.go
      type ResourceLockConfig struct {
          // 分布式唯一id
          Identity string
          EventRecorder EventRecorder
      }

      LeaseLock类型对应函数详解:Create, Update, Get方法都是利用client去访问kubernetes的api-server。

      // tools/leaderelection/resourcelock/leaselock.go
      
      // 通过访问apiserver获取当前资源锁对象ll.lease,并组织返回对应的LeaderElectionRecord对象和LeaderElectionRecord序列化值
      // Get returns the election record from a Lease spec
      func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
      	var err error
      	// 获取资源锁对应的资源对象ll.lease
      	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
      	if err != nil {
      		return nil, nil, err
      	}
      	// 利用lease资源对象spec生成对应LeaderElectionRecord资源对象
      	record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
      	// 序列化LeaderElectionRecord资源对象(byte[])
      	recordByte, err := json.Marshal(*record)
      	if err != nil {
      		return nil, nil, err
      	}
      	return record, recordByte, nil
      }
      
      // 根据LeaderElectionRecord创建对应资源锁对象 ll.lease
      // Create attempts to create a Lease
      func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
      	var err error
      	ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
      		ObjectMeta: metav1.ObjectMeta{
      			Name:      ll.LeaseMeta.Name,
      			Namespace: ll.LeaseMeta.Namespace,
      		},
      		// 利用ElectionRecord资源对象生成对应lease资源对象spec
      		Spec: LeaderElectionRecordToLeaseSpec(&ler),
      	}, metav1.CreateOptions{})
      	return err
      }
      
      // Update will update an existing Lease spec.
      func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
      	if ll.lease == nil {
      		return errors.New("lease not initialized, call get or create first")
      	}
      	// 利用ElectionRecord资源对象生成对应lease资源对象spec
      	ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
      
      	lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
      	if err != nil {
      		return err
      	}
      
      	ll.lease = lease
      	return nil
      }
      
      // RecordEvent in leader election while adding meta-data
      func (ll *LeaseLock) RecordEvent(s string) {
      	if ll.LockConfig.EventRecorder == nil {
      		return
      	}
      	events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
      	ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events)
      }
      
      // Describe is used to convert details on current resource lock
      // into a string
      func (ll *LeaseLock) Describe() string {
      	return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name)
      }
      
      // Identity returns the Identity of the lock
      func (ll *LeaseLock) Identity() string {
      	return ll.LockConfig.Identity
      }
      
      
      // 利用lease资源对象spec生成对应LeaderElectionRecord资源对象
      func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
      	var r LeaderElectionRecord
      	if spec.HolderIdentity != nil {
      		r.HolderIdentity = *spec.HolderIdentity
      	}
      	if spec.LeaseDurationSeconds != nil {
      		r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
      	}
      	if spec.LeaseTransitions != nil {
      		r.LeaderTransitions = int(*spec.LeaseTransitions)
      	}
      	if spec.AcquireTime != nil {
      		r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
      	}
      	if spec.RenewTime != nil {
      		r.RenewTime = metav1.Time{spec.RenewTime.Time}
      	}
      	return &r
      
      }
      
      // 利用ElectionRecord资源对象生成对应lease资源对象spec
      func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec {
      	leaseDurationSeconds := int32(ler.LeaseDurationSeconds)
      	leaseTransitions := int32(ler.LeaderTransitions)
      	return coordinationv1.LeaseSpec{
      		HolderIdentity:       &ler.HolderIdentity,
      		LeaseDurationSeconds: &leaseDurationSeconds,
      		AcquireTime:          &metav1.MicroTime{ler.AcquireTime.Time},
      		RenewTime:            &metav1.MicroTime{ler.RenewTime.Time},
      		LeaseTransitions:     &leaseTransitions,
      	}
      }

      3.2  LeaderElector

      LeaderElectionConfig:

      定义了一些竞争资源的参数,用于保存当前应用的一些配置,包括资源锁、持有锁的时间等,LeaderElectionConfig.lock 支持保存在以下三种资源中:

      • configmap
      • endpoint
      • lease

      包中还提供了一个 multilock ,即可以进行选择两种,当其中一种保存失败时,选择第二种。

      //client-go/tools/leaderelection/leaderelection.go
      
      type LeaderElectionConfig struct {
      	// Lock 的类型
      	Lock rl.Interface
      	//持有锁的时间
      	LeaseDuration time.Duration
      	//在更新租约的超时时间
      	RenewDeadline time.Duration
      	//竞争获取锁的时间
      	RetryPeriod time.Duration
      	//需要用户配置的状态变化时执行的函数,支持三种:
      	//1、OnStartedLeading 启动是执行的业务代码
      	//2、OnStoppedLeading leader停止执行的方法
      	//3、OnNewLeader 当产生新的leader后执行的方法
      	Callbacks LeaderCallbacks
      
      	//进行监控检查
      	// WatchDog is the associated health checker
      	// WatchDog may be null if its not needed/configured.
      	WatchDog *HealthzAdaptor
      	//leader退出时,是否执行release方法
      	ReleaseOnCancel bool
      
      	// Name is the name of the resource lock for debugging
      	Name string
      }

      LeaderElector:

      是一个竞争资源的实体。 

      //client-go/tools/leaderelection/leaderelection.go
      // LeaderElector is a leader election client.
      type LeaderElector struct {
      	// 用于保存当前应用的一些配置
      	config LeaderElectionConfig
      	// 通过apiserver远程获取的资源锁对象 (不一定自己是leader) 所有想竞争此资源的应用获取的是同一份
      	// internal bookkeeping
      	observedRecord    rl.LeaderElectionRecord
      	//资源锁对象spec,用于和远程获取的资源锁对象值比较
      	observedRawRecord []byte
      	// 获取的时间
      	observedTime      time.Time
      	// used to implement OnNewLeader(), may lag slightly from the
      	// value observedRecord.HolderIdentity if the transition has
      	// not yet been reported.
      	reportedLeader string
      
      	// clock is wrapper around time to allow for less flaky testing
      	clock clock.Clock
      
      	metrics leaderMetricsAdapter
      }

      这里着重要关注以下几个属性:

      config: 该LeaderElectionConfig对象配置了当前应用的客户端, 以及此客户端的唯一id等等。
      observedRecord: 该LeaderElectionRecord就是保存着从api-server中获得的leader的信息。
      observedTime: 获得的时间。

      很明显判断当前进程是不是leader只需要判断config中的id和observedRecord中的id是不是一致即可.

      func (le *LeaderElector) GetLeader() string {
          return le.observedRecord.HolderIdentity
      }
       
      // IsLeader returns true if the last observed leader was this client else returns false.
      func (le *LeaderElector) IsLeader() bool {
          return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
      }

      3.3 LeaderElector运行逻辑

      3.3.1 run

      func (le *LeaderElector) Run(ctx context.Context) {
      	defer func() {
      		runtime.HandleCrash()
      		le.config.Callbacks.OnStoppedLeading()
      	}()
      	// 如果获取成功 那就是ctx signalled done
      	// 不然即使失败, 该client也会一直去尝试获得leader位置
      	if !le.acquire(ctx) {
      		return // ctx signalled done
      	}
      	// 如果获得leadership 以goroutine和回调的形式启动用户自己的逻辑方法OnStartedLeading
      	ctx, cancel := context.WithCancel(ctx)
      	defer cancel()
      	go le.config.Callbacks.OnStartedLeading(ctx)
      	// 一直去续约 这里也是一个循环操作
      	// 如果失去了leadership 该方法才会返回
      	// 该方法返回 整个Run方法就返回了
      	le.renew(ctx)
      }

      1. 该client(也就是le这个实例)首先会调用acquire方法一直尝试去竞争leadership(如果竞争失败, 继续竞争, 不会进入2. 竞争成功, 进入2)。
      2. 异步启动用户自己的逻辑程序(OnStartedLeading)(进入3)。
      3. 通过调用renew方法续约自己的leadership. 续约成功, 继续续约,续约失败, 整个Run就结束了。

      3.3.2 acquire

      //检查是否需要广播新产生的leader
      func (le *LeaderElector) maybeReportTransition() {
          // 如果没有变化 则不需要更新
          if le.observedRecord.HolderIdentity == le.reportedLeader {
              return
          }
          // 更新reportedLeader为最新的leader的id
          le.reportedLeader = le.observedRecord.HolderIdentity
          if le.config.Callbacks.OnNewLeader != nil {
              // 调用当前应用的回调函数OnNewLeader报告新的leader产生
              go le.config.Callbacks.OnNewLeader(le.reportedLeader)
          }
      }
       
      // 一旦获得leadership 立马返回true,那就是ctx signalled done
      // 失败的话,该client会一直去尝试获得leader位置
      func (le *LeaderElector) acquire(ctx context.Context) bool {
          ctx, cancel := context.WithCancel(ctx)
          defer cancel()
          succeeded := false
          desc := le.config.Lock.Describe()
          klog.Infof("attempting to acquire leader lease  %v...", desc)
          wait.JitterUntil(func() {
              // 尝试获得或者更新资源
              succeeded = le.tryAcquireOrRenew()
              // 有可能会产生新的leader
              // 所以调用maybeReportTransition检查是否需要广播新产生的leader
              le.maybeReportTransition()
              if !succeeded {
                  // 如果获得leadership失败 则返回后继续竞争
                  klog.V(4).Infof("failed to acquire lease %v", desc)
                  return
              }
              // 自己成为leader
              // 可以调用cancel方法退出JitterUntil进而从acquire中返回
              le.config.Lock.RecordEvent("became leader")
              le.metrics.leaderOn(le.config.Name)
              klog.Infof("successfully acquired lease %v", desc)
              cancel()
          }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
          return succeeded
      }

      acquire的作用如下:
      1. 一旦获得leadership,立马返回true,否则会隔RetryPeriod时间尝试一次。

      这里的逻辑比较简单, 主要的逻辑是在tryAcquireOrRenew方法中。

      3.3.3 renew and release

      // RenewDeadline=15s RetryPeriod=5s
      // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
      func (le *LeaderElector) renew(ctx context.Context) {
      	ctx, cancel := context.WithCancel(ctx)
      	defer cancel()
      	// 每隔RetryPeriod会调用 除非cancel()方法被调用才会退出
      	wait.Until(func() {
      		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
      		defer timeoutCancel()
      		// 每隔5s调用该方法直到该方法返回true为止
      		// 如果超时了也会退出该方法 并且err中有错误信息
      		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
      			return le.tryAcquireOrRenew(timeoutCtx), nil
      		}, timeoutCtx.Done())
      
      		// 有可能会产生新的leader 如果有会广播新产生的leader
      		le.maybeReportTransition()
      		desc := le.config.Lock.Describe()
      		if err == nil {
      			// 如果err == nil, 表明上面PollImmediateUntil中返回true了 续约成功 依然处于leader位置
      			// 返回后 继续运行wait.Until的逻辑
      			klog.V(4).Infof("successfully renewed lease %v", desc)
      			return
      		}
      		// err != nil 表明超时了 试的总时间超过了RenewDeadline 失去了leader位置 续约失败
      		// 调用cancel方法退出wait.Until
      		le.config.Lock.RecordEvent("stopped leading")
      		le.metrics.leaderOff(le.config.Name)
      		klog.Infof("failed to renew lease %v: %v", desc, err)
      		cancel()
      	}, le.config.RetryPeriod, ctx.Done())
      
      	// if we hold the lease, give it up
      	if le.config.ReleaseOnCancel {
      		le.release()
      	}
      }
      
      // leader续约cancel()的时候释放资源锁对象holderIdentity字段的值
      // release attempts to release the leader lease if we have acquired it.
      func (le *LeaderElector) release() bool {
      	if !le.IsLeader() {
      		return true
      	}
      	now := metav1.Now()
      	leaderElectionRecord := rl.LeaderElectionRecord{
      		LeaderTransitions:    le.observedRecord.LeaderTransitions,
      		LeaseDurationSeconds: 1,
      		RenewTime:            now,
      		AcquireTime:          now,
      	}
      	if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
      		klog.Errorf("Failed to release lock: %v", err)
      		return false
      	}
      	le.observedRecord = leaderElectionRecord
      	le.observedTime = le.clock.Now()
      	return true
      }

      可以看到该client的base条件是它自己是当前的leader, 然后来续约操作。

      这里来说一下RenewDeadline和RetryPeriod的作用。
      每隔RetryPeriod时间会通过tryAcquireOrRenew续约, 如果续约失败, 还会进行再次尝试. 一直到尝试的总时间超过RenewDeadline后该client就会失去leadership。

      3.3.4 tryAcquireOrRenew

      // 竞争或者更新leadership
      // 成功返回true 失败返回false
      func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
      	now := metav1.Now()
      	leaderElectionRecord := rl.LeaderElectionRecord{
      		HolderIdentity:       le.config.Lock.Identity(),
      		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
      		RenewTime:            now,
      		AcquireTime:          now,
      	}
      
      	// 1. obtain or create the ElectionRecord
      	// client通过apiserver获得ElectionRecord和ElectionRecord序列化值
      	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
      	if err != nil {
      		if !errors.IsNotFound(err) {
      			// 失败直接退出
      			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
      			return false
      		}
      		// 因为没有获取到, 因此创建一个新的进去
      		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
      			klog.Errorf("error initially creating leader election record: %v", err)
      			return false
      		}
      		// 然后设置observedRecord为刚刚加入进去的leaderElectionRecord
      		le.observedRecord = leaderElectionRecord
      		le.observedTime = le.clock.Now()
      		return true
      	}
      
      	// 2. Record obtained, check the Identity & Time
      	// 从远端获取到record(资源)成功存到oldLeaderElectionRecord
      	// 如果oldLeaderElectionRecord与observedRecord不相同 更新observedRecord
      	// 因为observedRecord代表是从远端存在Record
      
      	// 需要注意的是每个client都在竞争leadership, 而leader一直在续约, leader会更新它的RenewTime字段
      	// 所以一旦leader续约成功 每个non-leader候选者都需要更新其observedTime和observedRecord
      	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
      		le.observedRecord = *oldLeaderElectionRecord
      		le.observedRawRecord = oldLeaderElectionRawRecord
      		le.observedTime = le.clock.Now()
      	}
      	// 如果leader已经被占有并且不是当前自己这个应用, 而且时间还没有到期
      	// 那就直接返回false, 因为已经无法抢占 时间没有过期
      	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
      		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
      		!le.IsLeader() {
      		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
      		return false
      	}
      
      	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
      	// here. Let's correct it before updating.
      	if le.IsLeader() {
      		// 如果当前服务就是以前的占有者
      		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
      		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
      	} else {
      		// 如果当前服务不是以前的占有者 LeaderTransitions加1
      		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
      	}
      
      	// update the lock itself
      	// 当前client占有该资源 成为leader
      	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
      		klog.Errorf("Failed to update lock: %v", err)
      		return false
      	}
      
      	le.observedRecord = leaderElectionRecord
      	le.observedTime = le.clock.Now()
      	return true
      }

      这里需要注意的是当前client不是leader的时候, 如何去判断一个leader是否已经expired了?

           通过le.observedTime.Add(le.config.LeaseDuration).After(now.Time);

      • le.observedTime: 代表的是获得leader(截止当前时间为止的最后一次renew)对象的时间;
      • le.config.LeaseDuration: 当前进程获得leadership需要的等待时间;
      • le.observedTime.Add(le.config.LeaseDuration): 就是自己(当前进程)被允许获得leadership的时间。

      如果le.observedTime.Add(le.config.LeaseDuration).before(now.Time)为true的话, 就表明leader过期了。白话文的意思就是从leader上次续约完, 已经超过le.config.LeaseDuration的时间没有续约了, 所以被认为该leader过期了,这时候non-leader就可以抢占leader了。

      4、总结

      leaderelection 主要是利用了k8s API操作的原子性实现了一个分布式锁,在不断的竞争中进行选举。选中为leader的进行才会执行具体的业务代码,这在k8s中非常的常见,而且我们很方便的利用这个包完成组件的编写,从而实现组件的高可用,比如部署为一个多副本的Deployment,当leader的pod退出后会重新启动,可能锁就被其他pod获取继续执行。

      当应用在k8s上部署时,使用k8s的资源锁,可方便的实现高可用,但需要注意:

      • 推荐使用lease或configmap作为资源锁,原因是某些组件(如kube-proxy)会去监听endpoints来更新节点iptables规则,当有大量资源锁时,势必会对性能有影响。
      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.51cto.com/u_15481067/11748728,作者:人艰不拆_zmc,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:grep命令的使用方法及实用技巧详解

      下一篇:银联iso8583协议报文解析

      相关文章

      2025-05-09 08:51:09

      elasticsearch删除脏数据(根据指定字段删除数据)

      elasticsearch删除脏数据(根据指定字段删除数据)

      2025-05-09 08:51:09
      id , 删除 , 数据 , 查询
      2025-05-09 08:50:35

      Element学习(布局组件、案例操作)(4)

      Element学习(布局组件、案例操作)(4)

      2025-05-09 08:50:35
      展示 , 布局 , 效果 , 查询 , 组件 , 表单 , 表示
      2025-05-07 09:08:08

      数据库Sql题目 : 不用id,每隔10行取一条数据

      数据库Sql题目 : 不用id,每隔10行取一条数据

      2025-05-07 09:08:08
      id , 题目
      2025-04-23 08:18:27

      结构型模式---装饰模式

      装饰模式是一种结构形模式,允许通过将对象放入包含行为的特殊封装对象(装饰器)中来为原来对象(组件)绑定新的行为。

      2025-04-23 08:18:27
      对象 , 模式 , 组件 , 行为 , 装饰
      2025-04-22 09:40:08

      【ETL工具】Kettle 调优 (使用阻塞组件的同时数据量大)

      【ETL工具】Kettle 调优 (使用阻塞组件的同时数据量大)

      2025-04-22 09:40:08
      组件 , 缓存 , 队列
      2025-04-22 09:28:19

      Vue3中如何实现动态菜单递归

      Vue3中如何实现动态菜单递归

      2025-04-22 09:28:19
      实现 , 组件 , 菜单 , 路由 , 递归
      2025-04-18 07:11:32

      golang实战项目log2metrics架构说明

      golang实战项目log2metrics架构说明

      2025-04-18 07:11:32
      code , 日志 , 组件
      2025-04-18 07:10:38

      设计模式-适配器模式

      适配器模式(Adapter Pattern)是一种结构型设计模式,它允许不兼容的接口能够一起工作。这种模式通常用于使现有的类与其他类一起工作,而无需修改它们的源代码。在 JavaScript 中,适配器模式可以用来确保不同的类或对象之间能够通过一个统一的接口进行交互。

      2025-04-18 07:10:38
      接口 , 模式 , 组件 , 适配 , 适配器
      2025-04-18 07:10:38

      设计模式-组合模式

      组合模式(Composite Pattern)在设计中的应用是为了简化对复杂树形结构的管理和操作。这个模式能够让客户端通过一个统一的接口操作单个对象和组合对象。我们可以通过一个具体的例子来进一步理解组合模式在实际应用中的作用和实现。

      2025-04-18 07:10:38
      客户端 , 接口 , 操作 , 模式 , 组件 , 组合
      2025-04-18 07:10:30

      React之withRouter的作用和应用

      React之withRouter的作用和应用

      2025-04-18 07:10:30
      history , math , props , 组件 , 路由 , 跳转
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5229718

      查看更多

      最新文章

      Element学习(布局组件、案例操作)(4)

      2025-05-09 08:50:35

      结构型模式---装饰模式

      2025-04-23 08:18:27

      Vue3中如何实现动态菜单递归

      2025-04-22 09:28:19

      React之withRouter的作用和应用

      2025-04-18 07:10:30

      prometheus为k8s做的4大适配工作

      2025-04-16 09:26:39

      【JAVA】-- GUI用户界面设计(面板组件、文本组件、标签组件、按钮组件、下拉框组件)

      2025-04-11 07:11:40

      查看更多

      热门文章

      【DockerImage】修复Docker镜像的组件安全漏洞(原创)

      2024-07-01 01:32:37

      Salesforce之18位id与15位id

      2023-04-06 09:56:07

      ssh-copy-id使用及非默认22端口时报错

      2023-05-11 06:38:28

      Vue封装组件库-input组件

      2023-06-08 06:21:43

      React-组件-setState

      2023-06-16 06:09:26

      Window组件

      2023-05-25 14:43:15

      查看更多

      热门标签

      linux java python javascript 数组 前端 docker Linux vue 函数 shell git 节点 容器 示例
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      Vue学习笔记:v-model组件绑定

      结构型模式---装饰模式

      批量删除docker过期停止的容器(全)

      Vue学习笔记:$refs与ref

      如何对 React 函数式组件进行优化

      vue-area-linkage 省市区Vue组件

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号