带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)

1. Client-goIndexer

 

资源对象从 DeltaFIFOPop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformer的 Run 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop出来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformer的 HandleDeltas方法,具体见代码清单 2-45。



func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

 

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

 

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

 

...

}()


s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

}

 

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

}()

r:=NewReflector(


c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

...

}

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

 

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

}

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

}

}

}

}

 

 

综上可知,由 DeltaFIFO中Pop出来的对象最后交给了 HandleDeltas进行处理,而在 HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的第 3个组件 Indexer。Indexer是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

 

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr


}

 

requestedresync 

nil{

 

==nil{


isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

 

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange


resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners


thatrequestedresync

 

==oldAccessor.GetResourceVersion()

}


isSync=accessor.GetResourceVersion()

}

}

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

 

false)

}

s.processor.distribute(addNotification{newObj:d.Object},

 

}

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{


 

returnerr

}

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

}

}

returnnil

}

 

Indexer   是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。Cache是 Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 Items的 Map中;索引即为Items的 Map建立三层索引:IndicesMap类型索引(如 namespace、nodeName等);IndexMap 类型索引(如 namespace1、namespace2……);runtime.object类型索引,实现见代码清 单2-47。


typeIndexerinterface{Store

//indexName索引类,obj是对象,计算objindexName索引类中的索引键,通过索引键

获取所有的对象

//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKey是 indexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键

IndexKeys(indexName,indexedValuestring)([]string,error)

//获取indexName索引类中的所有索引键

ListIndexFuncValues(indexNamestring)[]string

//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//返回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分类

AddIndexers(newIndexersIndexers)error

}

 

Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc() 代码位置:

 

client-go/tools/cache/index.go),Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。



func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

 

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

}

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

}

 

returnlist,nil


}

 

上述方法接收两个参数:indexName(索引器的名称)indexedValue需要索引的 Key。首先根据索引器名称查找指定的索引器函数c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key

(indexedValue)从缓存中进行数据查询,并返回查询结果。

上一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十七)


下一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.3 Kube-APIServer 介绍(一)