带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

Watch操作通过 HTTP与 KubernetesAPIServer建立长链接, 接收 KubernetesAPIServer发来的变更时间,Watch操作的实现机制使用的是 HTTP的分块传输编码。当 Client-go调 用 KubernetesAPIServer 时, 在 Response的 HTTPHeader 中 设 置Transfer-Encoding的值为 Chunked。r.listerWatcher.Watch实际调用了 PodInformer的 watchfunc函数。通过 ClientSet客户端与 APIServer 建立长链接,监控指定资源的变更事件。r.watchHandler用于处理资源的变更时间,当初发增删改AddedUpdated等事件时,将对应的资源对象更新到本地缓存 DeltaFIFO中,并更新 ResouceVersion。至此实现了 Reflctor组件的功能。

 

1. Client-goDeltaFIFO


DeltaFIFO是一个 FIFO队列,记录了资源对象的变化过程。作为一个 FIFO队列,它的生产者就是 Reflector组件,前面讲过 Reflector将监听对象同步到 DeltaFIFO中,DeltaFIFO对这些资源对象做了什么,见代码清单2-40。

typeDeltaFIFOstruct{

locksync.RWMutex

condsync.Cond//条件变量,唤醒等待的协程

itemsmap[string]Deltas//Delta存储桶

queue[]string//队列存储对象键实际就是和items⼀起形成了⼀个有序Map

//true通过Replace() 第⼀批元素被插⼊队列或者Delete/Add/Update⾸次被调⽤

populatedbool

//Replace() 被⾸次调⽤时插⼊的元素数⽬

initialPopulationCountint

 

//函数计算元素Key

keyFuncKeyFunc

 

//列出已知的对象

knownObjectsKeyListerGetter

//队列是否被关闭,关闭互斥锁

closedboolclosedLocksync.Mutex


}

FIFO接收 Reflector的 Adds/Updates 添加和更新事件,并将它们按照顺序放入队列。元素在队列中被处理之前,如果有多个Adds/Updates 事件发生,事件只会被处理一次。

使用场景(1仅处理对象一次(2处理完当前事件后才能处理最新版本的对象;(3删除对象之后不会处理(4)不能周期性重新处理对象。这里的Delta对象就是 Kubernetes系统中对象的变化。Delta有 Type和 Object两个属性,DeltaType就是资源变化的类型, 比如 Add、Update等,DeltaObject就是具体的 Kubernetes资源对象,见代码清单 2-41。例如,此时 Reflector中监听了一个PodA的 Add事件,那么此时 DeltaType就是Added,DeltaObject就是 PodA,DeltaFIFO中的数据是什么样的呢?此时 Items中会有 Add类型的 Delta,Queue中也会有这个事件的 Key。这个 Key由 KeyFunc生成。Client-go中默认的 KeyFunc是 MetaNamespaceKeyFunc,可以在tools/cache/store.go:76中找到。由 MetaNamespaceKeyFunc生成的 Key格式为/,用来标识不同命名空间下的不同资源。

typeDeltastruct{

Type    DeltaType                   //Delta类型,⽐如增、减,后⾯有详细说明

Objectinterface{}                  //对象,Delta的粒度是⼀个对象

}

typeDeltaTypestring                  //Delta的类型⽤字符串表达

const(

AddedDeltaType= "Added" //增加UpdatedDeltaType= "Updated" //更新DeletedDeltaType= "Deleted" //删除SyncDeltaType= "Sync" //同步

)

typeDeltas[]Delta                    //Delta数组

 


既然 DeltaFIFO是一个 FIFO,那么它就应该有基本的 FIFO功能,这里 DeltaFIFO实现了 Queue接口。下面看一下 Queue接口功能的定义。我们可以看出 Queue扩展了Store接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close方法。Store是一个通用的对象存储和处理的接口,本身提供了 Add、Update、List、Get等方法,Queue接口增加了 Pop方法,实现了一个基本 FIFO队列,具体见代码清单 2-42。

typeQueueinterface{Store

Pop(PopProcessFunc)(interface{},error)AddIfNotPresent(interface{})errorHasSynced()bool

Close()

}

 

下面我们来看一下FIFO队列的基本功能是怎么实现的。首先是 Add方法,我们可以看到 Add方法会先根据 KeyFunc计算出对象的 Key,如果队列中没有这个对象,我们就在这个队列尾部增补对象的Key,并且将这个对象存入 Map,具体见代码清2-43

带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)func(f*FIFO)Add(objinterface{})error{id,err:=f.keyFunc(obj)

iferr!=nil{

returnKeyError{obj,err}

}

f.lock.Lock()

deferf.lock.Unlock()f.populated=true

if _,exists:=f.items[id];!exists{f.queue=append(f.queue,id)

}

f.items[id]=objf.cond.Broadcast()returnnil


}

 

接下来我们看一下 Pop方法, 在 Queue中至少有一个资源时才会进行 Pop作。在处理资源之前,资源会从队列(和存储)中移除,如果未成功处理资源,应该用AddIfNotPresent()函数将资源添加回队列。处理逻辑由 PopProcessFunc进行执行,具体见代码清单2-44。

func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){

f.lock.Lock()

deferf.lock.Unlock()for{

forlen(f.queue)==0{

//Whenthequeueisempty,invocationofPop()isblockeduntilnewitemisenqueued.

//WhenClose()iscalled,thef.closedissetandtheconditionisbroadcasted.

//Whichcausesthislooptocontinueandreturnfromthe

Pop().


iff.IsClosed(){

returnnil,FIFOClosedError


}

 

f.cond.Wait()

}

id:=f.queue[0]

 

f.queue=f.queue[1:]item,ok:=f.items[id]

iff.initialPopulationCount>0{f.initialPopulationCount--

}

if!ok{

//Itemmayhavebeendeletedsubsequently.continue

}

delete(f.items,id)err:=process(item)

if e,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err

}

//Don'tneedtocopyDeltashere,becausewe'retransferring

//ownershiptothecaller.returnitem,err

}

}

 

func(f*DeltaFIFO)KeyOf(objinterface{})(string,error)

if d,ok:=obj.(Deltas);ok{iflen(d)==0{

return"",KeyError{obj,ErrZeroLengthDeltasObject}

}

obj=d.Newest().Object

}

ifd,ok:=obj.(DeletedFinalStateUnknown);ok{

returnd.Key,nil

}

returnf.keyFunc(obj)

}

 

值得注意的是,DeltaFIFO中用于计算对象键的函数KeyOf为什么要先进行一次Deltas的类型转换呢?是因为Pop 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的Delta对象了。至此,已实现DeltaFIFO基本功能。

上一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)


下一篇:.Net 程序员能力划分