Watch操作通过 HTTP与 KubernetesAPIServer建立长链接, 接收 KubernetesAPIServer发来的变更时间,Watch操作的实现机制使用的是 HTTP的分块传输编码。当 Client-go调 用 KubernetesAPIServer 时, 在 Response的 HTTPHeader 中 设 置Transfer-Encoding的值为 Chunked。r.listerWatcher.Watch实际调用了 PodInformer的 watchfunc函数。通过 ClientSet客户端与 APIServer 建立长链接,监控指定资源的变更事件。r.watchHandler用于处理资源的变更时间,当初发增删改AddedUpdated等事件时,将对应的资源对象更新到本地缓存 DeltaFIFO中,并更新 ResouceVersion。至此实现了 Reflctor组件的功能。
1. Client-goDeltaFIFO
DeltaFIFO是一个 FIFO队列,记录了资源对象的变化过程。作为一个 FIFO队列,它的生产者就是 Reflector组件,前面讲过 Reflector将监听对象同步到 DeltaFIFO中,DeltaFIFO对这些资源对象做了什么,见代码清单2-40。
typeDeltaFIFOstruct{
locksync.RWMutex
condsync.Cond//条件变量,唤醒等待的协程
itemsmap[string]Deltas//Delta存储桶
queue[]string//队列存储对象键实际就是和items⼀起形成了⼀个有序Map
//true通过Replace() 第⼀批元素被插⼊队列或者Delete/Add/Update⾸次被调⽤
populatedbool
//Replace() 被⾸次调⽤时插⼊的元素数⽬
initialPopulationCountint
//函数计算元素Key值
keyFuncKeyFunc
//列出已知的对象
knownObjectsKeyListerGetter
//队列是否被关闭,关闭互斥锁
closedboolclosedLocksync.Mutex
}
FIFO接收 Reflector的 Adds/Updates 添加和更新事件,并将它们按照顺序放入队列。元素在队列中被处理之前,如果有多个Adds/Updates 事件发生,事件只会被处理一次。
使用场景:(1)仅处理对象一次;(2)处理完当前事件后才能处理最新版本的对象;(3)删除对象之后不会处理;(4)不能周期性重新处理对象。这里的Delta对象就是 Kubernetes系统中对象的变化。Delta有 Type和 Object两个属性,DeltaType就是资源变化的类型, 比如 Add、Update等,DeltaObject就是具体的 Kubernetes资源对象,见代码清单 2-41。例如,此时 Reflector中监听了一个PodA的 Add事件,那么此时 DeltaType就是Added,DeltaObject就是 PodA,DeltaFIFO中的数据是什么样的呢?此时 Items中会有 Add类型的 Delta,Queue中也会有这个事件的 Key。这个 Key由 KeyFunc生成。Client-go中默认的 KeyFunc是 MetaNamespaceKeyFunc,可以在tools/cache/store.go:76中找到。由 MetaNamespaceKeyFunc生成的 Key格式为/,用来标识不同命名空间下的不同资源。
typeDeltastruct{
Type DeltaType //Delta类型,⽐如增、减,后⾯有详细说明
Objectinterface{} //对象,Delta的粒度是⼀个对象
}
typeDeltaTypestring //Delta的类型⽤字符串表达
const(
AddedDeltaType= "Added" //增加UpdatedDeltaType= "Updated" //更新DeletedDeltaType= "Deleted" //删除SyncDeltaType= "Sync" //同步
)
typeDeltas[]Delta //Delta数组
既然 DeltaFIFO是一个 FIFO,那么它就应该有基本的 FIFO功能,这里 DeltaFIFO实现了 Queue接口。下面看一下 Queue接口功能的定义。我们可以看出 Queue扩展了Store接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close方法。Store是一个通用的对象存储和处理的接口,本身提供了 Add、Update、List、Get等方法,Queue接口增加了 Pop方法,实现了一个基本 FIFO队列,具体见代码清单 2-42。
typeQueueinterface{Store
Pop(PopProcessFunc)(interface{},error)AddIfNotPresent(interface{})errorHasSynced()bool
Close()
}
下面我们来看一下FIFO队列的基本功能是怎么实现的。首先是 Add方法,我们可以看到 Add方法会先根据 KeyFunc计算出对象的 Key,如果队列中没有这个对象,我们就在这个队列尾部增补对象的Key,并且将这个对象存入 Map,具体见代码清单2-43。
func(f*FIFO)Add(objinterface{})error{id,err:=f.keyFunc(obj)
iferr!=nil{
returnKeyError{obj,err}
}
f.lock.Lock()
deferf.lock.Unlock()f.populated=true
if _,exists:=f.items[id];!exists{f.queue=append(f.queue,id)
}
f.items[id]=objf.cond.Broadcast()returnnil
}
接下来我们看一下 Pop方法, 在 Queue中至少有一个资源时才会进行 Pop操作。在处理资源之前,资源会从队列(和存储)中移除,如果未成功处理资源,应该用AddIfNotPresent()函数将资源添加回队列。处理逻辑由 PopProcessFunc进行执行,具体见代码清单2-44。
func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){
f.lock.Lock()
deferf.lock.Unlock()for{
forlen(f.queue)==0{
//Whenthequeueisempty,invocationofPop()isblockeduntilnewitemisenqueued.
//WhenClose()iscalled,thef.closedissetandtheconditionisbroadcasted.
//Whichcausesthislooptocontinueandreturnfromthe
Pop().
iff.IsClosed(){
returnnil,FIFOClosedError
}
f.cond.Wait()
}
id:=f.queue[0]
f.queue=f.queue[1:]item,ok:=f.items[id]
iff.initialPopulationCount>0{f.initialPopulationCount--
}
if!ok{
//Itemmayhavebeendeletedsubsequently.continue
}
delete(f.items,id)err:=process(item)
if e,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err
}
//Don'tneedtocopyDeltashere,becausewe'retransferring
//ownershiptothecaller.returnitem,err
}
}
func(f*DeltaFIFO)KeyOf(objinterface{})(string,error)
if d,ok:=obj.(Deltas);ok{iflen(d)==0{
return"",KeyError{obj,ErrZeroLengthDeltasObject}
}
obj=d.Newest().Object
}
ifd,ok:=obj.(DeletedFinalStateUnknown);ok{
returnd.Key,nil
}
returnf.keyFunc(obj)
}
值得注意的是,DeltaFIFO中用于计算对象键的函数KeyOf为什么要先进行一次Deltas的类型转换呢?是因为Pop 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的Delta对象了。至此,已实现DeltaFIFO的基本功能。