1、概述
在Kubernetes中,为了实现组件高可用,同一个组件需要部署多个副本,例如多个apiserver、scheduler、controller-manager等,其中apiserver是无状态的,每个组件都可以工作,而scheduler与controller-manager是有状态的,同一时刻只能存在一个活跃的,需要进行选主。
Kubernetes中是通过leaderelection来实现组件的高可用的。在Kubernetes本身的组件中,kube-scheduler和kube-manager-controller两个组件是有leader选举的,这个选举机制是Kubernetes对于这两个组件的高可用保障。即正常情况下kube-scheduler或kube-manager-controller组件的多个副本只有一个是处于业务逻辑运行状态,其它副本则不断的尝试去获取锁,去竞争leader,直到自己成为leader。如果正在运行的leader因某种原因导致当前进程退出,或者锁丢失,则由其它副本去竞争新的leader,获取leader继而执行业务逻辑。
不光是Kubernetes本身组件用到了这个选举策略,我们自己定义的服务同样可以用这个算法去实现选主。在Kubernetes client-go包中就提供了接口供用户使用。代码路径在client-go/tools/leaderelection下。
2、leaderelection使用示例
以下是一个简单使用的例子(例子来源于client-go中的example包中)
,编译完成之后同时启动多个进程,但是只有一个进程在工作,当把leader进程kill掉之后,会重新选举出一个leader进行工作,即执行其中的 run
方法:
//代码路径:client-go/examples/leader-election/main.go package main import ( "context" "flag" "os" "os/signal" "syscall" "time" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) func buildConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } return cfg, nil } cfg, err := rest.InClusterConfig() if err != nil { return nil, err } return cfg, nil } func main() { klog.InitFlags(nil) var kubeconfig string var leaseLockName string var leaseLockNamespace string var id string flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name") flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name") flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace") flag.Parse() if leaseLockName == "" { klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).") } if leaseLockNamespace == "" { klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).") } // leader election uses the Kubernetes API by writing to a // lock object, which can be a LeaseLock object (preferred), // a ConfigMap, or an Endpoints (deprecated) object. // Conflicting writes are detected and each client handles those actions // independently. config, err := buildConfig(kubeconfig) if err != nil { klog.Fatal(err) } client := clientset.NewForConfigOrDie(config) //业务逻辑 run := func(ctx context.Context) { // complete your controller loop here klog.Info("Controller loop...") select {} } // use a Go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.WithCancel(context.Background()) defer cancel() // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown") cancel() }() // we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases". // 指定锁的资源对象,这里使用了Lease资源,还支持configmap,endpoint,或者multilock(即多种配合使用) lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: leaseLockNamespace, }, Client: client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, } // 进行选举 // start the leader election code loop leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, // IMPORTANT: you MUST ensure that any code you have that // is protected by the lease must terminate **before** // you call cancel. Otherwise, you could have a background // loop still running and another process could // get elected before your background loop finished, violating // the stated goal of the lease. ReleaseOnCancel: true, LeaseDuration: 60 * time.Second, //租约时长,非主候选者用来判断资源锁是否过期 RenewDeadline: 15 * time.Second, //leader刷新资源锁超时时间 RetryPeriod: 5 * time.Second, //调用资源锁间隔 //回调函数,根据选举不同事件触发 Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { //变为leader执行的业务代码 // we're notified when we start - this is where you would // usually put your code run(ctx) }, OnStoppedLeading: func() { // 进程退出 // we can do cleanup here klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { //当产生新的leader后执行的方法 // we're notified when new leader elected if identity == id { // I just got the lock return } klog.Infof("new leader elected: %s", identity) }, }, }) }
关键启动参数说明:
kubeconfig: 指定kubeconfig文件地址 lease-lock-name:指定lock的名称 lease-lock-namespace:指定lock的namespace id: 例子中提供的区别参数,用于区分实例 logtostderr:klog提供的参数,指定log输出到控制台 v: 指定日志输出级别
2.1 同时启动三个进程:
启动进程1:
go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4
输出:
apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4 I0126 13:46:59.753974 35080 leaderelection.go:243] attempting to acquire leader lease default/example... I0126 13:47:00.660260 35080 leaderelection.go:253] successfully acquired lease default/example I0126 13:47:00.660368 35080 main.go:75] Controller loop...
这里可以看出来id=1的进程持有锁,并且运行的程序。
启动进程2:
go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4
输出:
apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 -v=4 I0126 13:47:05.066516 35096 leaderelection.go:243] attempting to acquire leader lease default/example... I0126 13:47:05.451887 35096 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:05.451909 35096 leaderelection.go:248] failed to acquire lease default/example I0126 13:47:05.451918 35096 main.go:145] new leader elected: 1 I0126 13:47:14.188160 35096 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:14.188188 35096 leaderelection.go:248] failed to acquire lease default/example I0126 13:47:24.929607 35096 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:24.929636 35096 leaderelection.go:248] failed to acquire lease default/example .......
这里可以看出来id=1的进程持有锁,并且运行的程序,而id=2的进程表示无法获取到锁,在不断的进行尝试。
启动进程3:
go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4
输出:
apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3 -v=4 I0126 13:47:12.431518 35112 leaderelection.go:243] attempting to acquire leader lease default/example... I0126 13:47:12.776614 35112 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:12.776649 35112 leaderelection.go:248] failed to acquire lease default/example I0126 13:47:12.776663 35112 main.go:145] new leader elected: 1 I0126 13:47:21.499295 35112 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:21.499325 35112 leaderelection.go:248] failed to acquire lease default/example I0126 13:47:32.241544 35112 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:47:32.241572 35112 leaderelection.go:248] failed to acquire lease default/example .......
这里可以看出来id=1的进程持有锁,并且运行的程序,而id=3的进程表示无法获取到锁,在不断的进行尝试。
2.2 停掉进程1并观察进程2和进程3竞争新的leader
apple@appledeMacBook-Pro test$ go run main.go -kubeconfig=/Users/apple/.kube/config148 -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 -v=4 I0126 13:46:59.753974 35080 leaderelection.go:243] attempting to acquire leader lease default/example... I0126 13:47:00.660260 35080 leaderelection.go:253] successfully acquired lease default/example I0126 13:47:00.660368 35080 main.go:75] Controller loop... ^CI0126 13:53:16.629114 35080 main.go:92] Received termination, signaling shutdown I0126 13:53:17.057999 35080 main.go:135] leader lost: 1
现在kill掉id=1进程,在等待lock释放之后(有个LeaseDuration时间),观察进程2和进程3的输出,看哪个进程成为新的leader。
id=2的进程输出:
...... I0126 13:53:11.208487 35096 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:53:11.208512 35096 leaderelection.go:248] failed to acquire lease default/example I0126 13:53:18.189514 35096 leaderelection.go:253] successfully acquired lease default/example I0126 13:53:18.189587 35096 main.go:75] Controller loop...
这里可以看出来id=2的进程持有锁,并且运行的程序。
id=3的进程输出:
...... I0126 13:53:04.675216 35112 leaderelection.go:248] failed to acquire lease default/example I0126 13:53:12.918706 35112 leaderelection.go:346] lock is held by 1 and has not yet expired I0126 13:53:12.918736 35112 leaderelection.go:248] failed to acquire lease default/example I0126 13:53:19.544314 35112 leaderelection.go:346] lock is held by 2 and has not yet expired I0126 13:53:19.544372 35112 leaderelection.go:248] failed to acquire lease default/example I0126 13:53:19.544387 35112 main.go:145] new leader elected: 2 I0126 13:53:26.346561 35112 leaderelection.go:346] lock is held by 2 and has not yet expired I0126 13:53:26.346591 35112 leaderelection.go:248] failed to acquire lease default/example ......
这里可以看出来id=2的进程持有锁,并且运行的程序,而id=3的进程表示无法获取到锁,在不断的进行尝试。
2.3 查看资源锁对象
[root@master1 ~]# kubectl get leases.coordination.k8s.io example -o yaml apiVersion: coordination.k8s.io/v1 kind: Lease metadata: creationTimestamp: "2022-01-26T05:46:38Z" managedFields: ....... manager: main operation: Update time: "2022-01-26T06:05:43Z" name: example namespace: default resourceVersion: "314956587" selfLink: /apis/coordination.k8s.io/v1/namespaces/default/leases/example uid: 5ce63489-c754-42b4-9e6c-a0a0a8442c3f spec: acquireTime: "2022-01-26T05:53:17.905076Z" //获得锁时间 holderIdentity: "2" //持有锁进程的标识 leaseDurationSeconds: 60 //lease租约时长 leaseTransitions: 1 //leader更换次数 renewTime: "2022-01-26T06:06:06.248393Z" //更新租约的时间
锁已经被进程2获得, 此时如果进程1再启动的话, 也只能一直尝试获取锁。
3、leaderelection源码分析
leaderelection基本原理其实就是利用通过Kubernetes中lease、configmap 、endpoints资源实现一个分布式锁,获取到锁的进程成为leader,并且定期更新租约(renew)。其他进程也在不断的尝试进行抢占,抢占不到则继续等待下次循环。当leader节点挂掉之后,租约到期,其他节点就成为新的leader。
代码路径在client-go/tools/leaderelection下.逻辑结构如下图:
注意: 请注意client-go的版本,不同版本对应LeaderElection的逻辑架构图也略微有所不同。
3.1、Interface接口
Interface
: 中定义了一系列方法, 包括增加、修改、获取一个LeaderElectionRecord
, 说白了就是一个客户端, 而且每个客户端实例都要有自己分布式唯一的id。
// tools/leaderelection/resourcelock/interface.go // 资源占有者的描述信息 type LeaderElectionRecord struct { // 持有锁进程的标识 也就是leader的id HolderIdentity string `json:"holderIdentity"` // 一个租约多长时间 LeaseDurationSeconds int `json:"leaseDurationSeconds"` // 获得leader的时间 AcquireTime metav1.Time `json:"acquireTime"` // 续约的时间 RenewTime metav1.Time `json:"renewTime"` // leader变更的次数 LeaderTransitions int `json:"leaderTransitions"` } type Interface interface { // 返回当前资源LeaderElectionRecord Get() (*LeaderElectionRecord, error) // 创建一个资源LeaderElectionRecord Create(ler LeaderElectionRecord) error // 更新资源 Update(ler LeaderElectionRecord) error // 记录事件 RecordEvent(string) // 返回当前该应用的id Identity() string // 描述信息(namespace/name) Describe() string }
Interface有四个实现类, 分别为EndpointLock
, ConfigMapLock、
LeaseLock和
MultiLock(一般不用),分别可以操作Kubernetes中的endpoint
, configmap
和lease。
这里以LeaseLock为例子说明。
// tools/leaderelection/resourcelock/leaselock.go type LeaseLock struct { // LeaseMeta should contain a Name and a Namespace of a // LeaseMeta object that the LeaderElector will attempt to lead. LeaseMeta metav1.ObjectMeta // 访问api-server的客户端 Client coordinationv1client.LeasesGetter // 该LeaseLock的分布式唯一身份id LockConfig ResourceLockConfig // 资源锁对应的lease资源对象 lease *coordinationv1.Lease } // tools/leaderelection/resourcelock/interface.go type ResourceLockConfig struct { // 分布式唯一id Identity string EventRecorder EventRecorder }
LeaseLock类型对应函数详解:Create
, Update
, Get
方法都是利用client
去访问kubernetes的api-server。
// tools/leaderelection/resourcelock/leaselock.go // 通过访问apiserver获取当前资源锁对象ll.lease,并组织返回对应的LeaderElectionRecord对象和LeaderElectionRecord序列化值 // Get returns the election record from a Lease spec func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) { var err error // 获取资源锁对应的资源对象ll.lease ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{}) if err != nil { return nil, nil, err } // 利用lease资源对象spec生成对应LeaderElectionRecord资源对象 record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec) // 序列化LeaderElectionRecord资源对象(byte[]) recordByte, err := json.Marshal(*record) if err != nil { return nil, nil, err } return record, recordByte, nil } // 根据LeaderElectionRecord创建对应资源锁对象 ll.lease // Create attempts to create a Lease func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error { var err error ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: ll.LeaseMeta.Name, Namespace: ll.LeaseMeta.Namespace, }, // 利用ElectionRecord资源对象生成对应lease资源对象spec Spec: LeaderElectionRecordToLeaseSpec(&ler), }, metav1.CreateOptions{}) return err } // Update will update an existing Lease spec. func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error { if ll.lease == nil { return errors.New("lease not initialized, call get or create first") } // 利用ElectionRecord资源对象生成对应lease资源对象spec ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler) lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{}) if err != nil { return err } ll.lease = lease return nil } // RecordEvent in leader election while adding meta-data func (ll *LeaseLock) RecordEvent(s string) { if ll.LockConfig.EventRecorder == nil { return } events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s) ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events) } // Describe is used to convert details on current resource lock // into a string func (ll *LeaseLock) Describe() string { return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) } // Identity returns the Identity of the lock func (ll *LeaseLock) Identity() string { return ll.LockConfig.Identity } // 利用lease资源对象spec生成对应LeaderElectionRecord资源对象 func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord { var r LeaderElectionRecord if spec.HolderIdentity != nil { r.HolderIdentity = *spec.HolderIdentity } if spec.LeaseDurationSeconds != nil { r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds) } if spec.LeaseTransitions != nil { r.LeaderTransitions = int(*spec.LeaseTransitions) } if spec.AcquireTime != nil { r.AcquireTime = metav1.Time{spec.AcquireTime.Time} } if spec.RenewTime != nil { r.RenewTime = metav1.Time{spec.RenewTime.Time} } return &r } // 利用ElectionRecord资源对象生成对应lease资源对象spec func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { leaseDurationSeconds := int32(ler.LeaseDurationSeconds) leaseTransitions := int32(ler.LeaderTransitions) return coordinationv1.LeaseSpec{ HolderIdentity: &ler.HolderIdentity, LeaseDurationSeconds: &leaseDurationSeconds, AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time}, RenewTime: &metav1.MicroTime{ler.RenewTime.Time}, LeaseTransitions: &leaseTransitions, } }
3.2 LeaderElector
LeaderElectionConfig
:
定义了一些竞争资源的参数,用于保存当前应用的一些配置,包括资源锁、持有锁的时间等,
LeaderElectionConfig.lock
支持保存在以下三种资源中:
configmap
endpoint
lease
包中还提供了一个 multilock
,即可以进行选择两种,当其中一种保存失败时,选择第二种。
//client-go/tools/leaderelection/leaderelection.go type LeaderElectionConfig struct { // Lock 的类型 Lock rl.Interface //持有锁的时间 LeaseDuration time.Duration //在更新租约的超时时间 RenewDeadline time.Duration //竞争获取锁的时间 RetryPeriod time.Duration //需要用户配置的状态变化时执行的函数,支持三种: //1、OnStartedLeading 启动是执行的业务代码 //2、OnStoppedLeading leader停止执行的方法 //3、OnNewLeader 当产生新的leader后执行的方法 Callbacks LeaderCallbacks //进行监控检查 // WatchDog is the associated health checker // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor //leader退出时,是否执行release方法 ReleaseOnCancel bool // Name is the name of the resource lock for debugging Name string }
LeaderElector:
是一个竞争资源的实体。
//client-go/tools/leaderelection/leaderelection.go
// LeaderElector is a leader election client. type LeaderElector struct { // 用于保存当前应用的一些配置 config LeaderElectionConfig // 通过apiserver远程获取的资源锁对象 (不一定自己是leader) 所有想竞争此资源的应用获取的是同一份 // internal bookkeeping observedRecord rl.LeaderElectionRecord //资源锁对象spec,用于和远程获取的资源锁对象值比较 observedRawRecord []byte // 获取的时间 observedTime time.Time // used to implement OnNewLeader(), may lag slightly from the // value observedRecord.HolderIdentity if the transition has // not yet been reported. reportedLeader string // clock is wrapper around time to allow for less flaky testing clock clock.Clock metrics leaderMetricsAdapter }
这里着重要关注以下几个属性:
config: 该LeaderElectionConfig对象配置了当前应用的客户端, 以及此客户端的唯一id等等。
observedRecord: 该LeaderElectionRecord就是保存着从api-server中获得的leader的信息。
observedTime: 获得的时间。
很明显判断当前进程是不是leader只需要判断config中的id和observedRecord中的id是不是一致即可.
func (le *LeaderElector) GetLeader() string { return le.observedRecord.HolderIdentity } // IsLeader returns true if the last observed leader was this client else returns false. func (le *LeaderElector) IsLeader() bool { return le.observedRecord.HolderIdentity == le.config.Lock.Identity() }
3.3 LeaderElector运行逻辑
3.3.1 run
func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() // 如果获取成功 那就是ctx signalled done // 不然即使失败, 该client也会一直去尝试获得leader位置 if !le.acquire(ctx) { return // ctx signalled done } // 如果获得leadership 以goroutine和回调的形式启动用户自己的逻辑方法OnStartedLeading ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) // 一直去续约 这里也是一个循环操作 // 如果失去了leadership 该方法才会返回 // 该方法返回 整个Run方法就返回了 le.renew(ctx) }
1. 该client(也就是le这个实例)首先会调用acquire方法一直尝试去竞争leadership(如果竞争失败, 继续竞争, 不会进入2. 竞争成功, 进入2)。
2. 异步启动用户自己的逻辑程序(OnStartedLeading)(进入3)。
3. 通过调用renew方法续约自己的leadership. 续约成功, 继续续约,续约失败, 整个Run就结束了。
3.3.2 acquire
func (le *LeaderElector) maybeReportTransition() { // 如果没有变化 则不需要更新 if le.observedRecord.HolderIdentity == le.reportedLeader { return } // 更新reportedLeader为最新的leader的id le.reportedLeader = le.observedRecord.HolderIdentity if le.config.Callbacks.OnNewLeader != nil { // 调用当前应用的回调函数OnNewLeader报告新的leader产生 go le.config.Callbacks.OnNewLeader(le.reportedLeader) } } // 一旦获得leadership 立马返回true // 返回false的唯一情况是ctx signals done func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { // 尝试获得或者更新资源 succeeded = le.tryAcquireOrRenew() // 有可能会产生新的leader // 所以调用maybeReportTransition检查是否需要广播新产生的leader le.maybeReportTransition() if !succeeded { // 如果获得leadership失败 则返回后继续竞争 klog.V(4).Infof("failed to acquire lease %v", desc) return } // 自己成为leader // 可以调用cancel方法退出JitterUntil进而从acquire中返回 le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded }
acquire的作用如下:
1. 一旦获得leadership,
立马返回true,
否则会隔RetryPeriod
时间尝试一次。
2. 一旦有ctx signals done
, 会返回false。
这里的逻辑比较简单, 主要的逻辑是在tryAcquireOrRenew
方法中。
3.3.3 renew
// RenewDeadline=15s RetryPeriod=5s // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() // 每隔RetryPeriod会调用 除非cancel()方法被调用才会退出 wait.Until(func() { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() // 每隔5s调用该方法直到该方法返回true为止 // 如果超时了也会退出该方法 并且err中有错误信息 err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(timeoutCtx), nil }, timeoutCtx.Done()) // 有可能会产生新的leader 如果有会广播新产生的leader le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { // 如果err == nil, 表明上面PollImmediateUntil中返回true了 续约成功 依然处于leader位置 // 返回后 继续运行wait.Until的逻辑 klog.V(4).Infof("successfully renewed lease %v", desc) return } // err != nil 表明超时了 试的总时间超过了RenewDeadline 失去了leader位置 续约失败 // 调用cancel方法退出wait.Until le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { le.release() } }
可以看到该client的base条件是它自己是当前的leader, 然后来续约操作。
这里来说一下RenewDeadline和RetryPeriod的作用。
每隔RetryPeriod时间会通过tryAcquireOrRenew续约, 如果续约失败, 还会进行再次尝试. 一直到尝试的总时间超过RenewDeadline后该client就会失去leadership。
3.3.4 tryAcquireOrRenew
// 竞争或者更新leadership // 成功返回true 失败返回false func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1. obtain or create the ElectionRecord // 从client端中获得ElectionRecord oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { // 失败直接退出 klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } // 因为没有获取到, 因此创建一个新的进去 if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false } // 然后设置observedRecord为刚刚加入进去的leaderElectionRecord le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true } // 2. Record obtained, check the Identity & Time // 从远端获取到record(资源)成功存到oldLeaderElectionRecord // 如果oldLeaderElectionRecord与observedRecord不相同 更新observedRecord // 因为observedRecord代表是从远端存在Record // 需要注意的是每个client都在竞争leadership, 而leader一直在续约, leader会更新它的RenewTime字段 // 所以一旦leader续约成功 每个non-leader候选者都需要更新其observedTime和observedRecord if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.observedRecord = *oldLeaderElectionRecord le.observedRawRecord = oldLeaderElectionRawRecord le.observedTime = le.clock.Now() } // 如果leader已经被占有并且不是当前自己这个应用, 而且时间还没有到期 // 那就直接返回false, 因为已经无法抢占 时间没有过期 if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } // 3. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. if le.IsLeader() { // 如果当前服务就是以前的占有者 leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { // 如果当前服务不是以前的占有者 LeaderTransitions加1 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } // update the lock itself // 当前client占有该资源 成为leader if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true }
这里需要注意的是当前client不是leader的时候, 如何去判断一个leader是否已经expired了?
- le.observedTime.Add(le.config.LeaseDuration).After(now.Time);
- le.observedTime: 代表的是获得leader(截止当前时间为止的最后一次renew)对象的时间;
- le.config.LeaseDuration: 自己(当前client)获得leadership需要的等待时间;
- le.observedTime.Add(le.config.LeaseDuration): 就是自己(当前client)被允许获得leadership的时间。
如果le.observedTime.Add(le.config.LeaseDuration).before(now.Time)为true的话, 就表明leader过期了. 白话文的意思就是从leader上次续约完, 已经超过le.config.LeaseDuration的时间没有续约了, 所以被认为该leader过期了. 把before换成after就是表明没有过期。
4、总结
leaderelection 主要是利用了k8s API操作的原子性实现了一个分布式锁,在不断的竞争中进行选举。选中为leader的进行才会执行具体的业务代码,这在k8s中非常的常见,而且我们很方便的利用这个包完成组件的编写,从而实现组件的高可用,比如部署为一个多副本的Deployment,当leader的pod退出后会重新启动,可能锁就被其他pod获取继续执行。
当应用在k8s上部署时,使用k8s的资源锁,可方便的实现高可用,但需要注意以下几点:
- 推荐使用
configmap
作为资源锁,原因是某些组件(如kube-proxy)
会去监听endpoints
来更新节点iptables规则,当有大量资源锁时,势必会对性能有影响。 - 当选举结束时调用
OnStoppedLeading
需要退出程序(例如os.Exit(0)
),若不退出程序,所有副本选举结束不会去竞争资源锁,就没有leader,造成服务不可用而这时程序并没有异常。需要执行退出逻辑,让Daemon程序k8s/systemd等重启服务来重新参与选主。
参考:https://www.jianshu.com/p/6e6f1d97d635 (endpoints类型资源锁
)
参考:https://tangqing.blog.csdn.net/article/details/110729620?spm=1001.2014.3001.5502
参考:https://silenceper.com/blog/202002/kubernetes-leaderelection/