带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

2.2.6        Client-go Informer 解析

 

1. Client-goInformer模块

 

Informer可以对 KubernetesAPIServer的资源执行 Watch操作 , 类型可以是Kubernetes内置资源,也可以是 CRD。其中最核心的模块是 Reflector、DeltaFIFO、Indexer。接下来我们逐个进行分析。


首先分析   Reflector,Reflector   用于监控指定资源的    Kubernetes。当资源发生变化时,如发生了资源添加(Added)、资源更新(Updated)等事件,Reflector会将其资源对象存放在本地缓存   DeltaFIFO   中。它的作用就是获取 APIServer中对象数据并实时地更新到本地,使得本地数据和ETCD数据完全一样。它的数据结构见代码清单2-37

typeReflectorstruct{

namestring//这个 Reflector的名称,默认为⽂件 ⾏数metrics*reflectorMetrics//⽤于保存 Reflector的⼀些监控指标expectedTypereflect.Type//期望放到 Store中的类型名称storeStore//与 Watch源同步的⽬标Store

listerWatcherListerWatcher//ListerWatcher接⼝,⽤于指定 List-Watch⽅法

period           time.Duration//Watch周期resyncPeriodtime.Duration//重新同步周期ShouldResyncfunc() bool

//clockallowsteststomanipulatetimeclockclock.Clock

lastSyncResourceVersionstring//最后同步的资源的版本号

lastSyncResourceVersionMutexsync.RWMutex//lastSyncResourceVersion的读写锁


}

 

通过 NewRefector实例化 Reflector对象,实例化过程中必须传入 ListerWatcher数据接口对象,它拥有List和 Watch方法,用于获取及监控资源列表,只要是实现了 List和 Watch方法的对象都可以成为 ListerWatcher,Reflector对象通过 run函数启动监控并处理事件,而在 Reflector源码实现中最主要的是List-Watch函数,它负责 List/Watch指定的 KubernetesAPIServer资源,见代码清单 2-38。

//NewNamedReflectorsameasNewReflector,butwithaspecifiednameforloggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},storeStore,resyncPeriodtime.Duration)*Reflector{

reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)r:=&Reflector{

name:name,

//weneedthistobeuniqueperprocess(somenamesarestillthesame)butobviouswhoitbelongsto

metrics:        newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.

Sprintf("reflector_"+name+"_%d",reflectorSuffix))),

listerWatcher:lw,store:store,

expectedType:reflect.TypeOf(expectedType),

period:    time.Second,resyncPeriod:resyncPeriod,clock:&clock.RealClock{},

}

returnr

}


List-Watch是怎么实现的? List-Watch主要分为 List和 Watch两部分。List负责获取对应资源的全量列表,Watch负责获取变化的部分。首先进行 List操作,这里把ResourceVersion设置为 0,因为要获取同步的对象的全部版本,所以从 0开始 List,主要流程如下(见代码清单2-39)。

(1)r.listerWatcher.List 用于获取资源下的所有对象的数据。

(2)  listMetaInterface.GetResourceVersion   用于获取资源版本号(ResouceVersion),资源版本号非常重要,Kubernetes中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时,KubernetesAPIServer都会更改 ResouceVersion,使得 Client-go执行 Watch操作时可以根据 ResourceVersion 来确定当前资源对象是否发生过变化。

(3)   meta.ExtractList用于将资源数据转换成资源对象列表,将runtime.Object转换成[]runtime.Object,因为 r.listerWatcher.List只是获取一个列表。

(4)r.syncWith 用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并替换已存在的对象。

(5)r.setLastSyncResourceVersion 用于设置最新的资源版本号。


func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{glog.V(3).Infof("Listingandwatching%vfrom%s",r.expectedType,r.name)varresourceVersionstring

options:=metav1.ListOptions{ResourceVersion:"0"}r.metrics.numberOfLists.Inc()

start:=r.clock.Now()

list,err:=r.listerWatcher.List(options)iferr!=nil{

returnfmt.Errorf("%s:Failedtolist%v:%v",r.name,r.expectedType,

err)

}

r.metrics.listDuration.Observe(time.Since(start).Seconds())listMetaInterface,err:=meta.ListAccessor(list)

iferr!=nil{

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v:%v",


r.name,list,err)

}

resourceVersion=listMetaInterface.GetResourceVersion()items,err:=meta.ExtractList(list)

iferr!=nil{

returnfmt.Errorf("%s:Unabletounderstandlistresult%#v(%v)",r.name,list,err)

}

r.metrics.numberOfItemsInList.Observe(float64(len(items)))

iferr:=r.syncWith(items,resourceVersion);err!=nil{

returnfmt.Errorf("%s:Unabletosynclistresult:%v",r.name,err)

}

r.setLastSyncResourceVersion(resourceVersion)

 

resyncerrc:=make(chanerror,1)


cancelCh:=make(chanstruct{})deferclose(cancelCh)

gofunc(){

resyncCh,cleanup:=r.resyncChan()deferfunc(){

cleanup()//Callthelastonewrittenintocleanup

}()

for{

select{

case<-resyncCh:case<-stopCh:

returncase<-cancelCh:

return

}

ifr.ShouldResync==nil||r.ShouldResync(){glog.V(4).Infof("%s:forcingresync",r.name)iferr:=r.store.Resync();err!=nil{

resyncerrc<-errreturn

}

}

}()

 

for{


cleanup()

resyncCh,cleanup=r.resyncChan()

}

//givethestopChachancetostoptheloop,evenincaseofcontinue


statementsfurtherdownonerrorsselect{

case<-stopCh:

returnnildefault:

}

 

timeoutSeconds:=int64(minWatchTimeout.Seconds()* (rand.Float64()+


1.0))


options=metav1.ListOptions{ResourceVersion:resourceVersion,TimeoutSeconds:&timeoutSeconds,


}

 

r.metrics.numberOfWatches.Inc()

w,err:=r.listerWatcher.Watch(options)


 

 

iferr!=nil{

switcherr{

caseio.EOF:

//watchclosednormally

caseio.ErrUnexpectedEOF:

glog.V(1).Infof("%s:Watchfor%vclosedwithunexpected

EOF:%v",r.name, r.expectedType, err)

default:

utilruntime.HandleError(fmt.Errorf("%s:Failedtowatch

%v:%v",r.name,r.expectedType,err))

}

ifurlError,ok:=err.(*url.Error);ok{

ifopError,ok:=urlError.Err.(*net.OpError);ok{

iferrno,ok:=opError.Err.(syscall.Errno);ok&&

errno==syscall.ECONNREFUSED{

time.Sleep(time.Second)continue

}

}

}

returnnil

}

 


 

err!=nil{


iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);

 

iferr!=errorStopRequested{

glog.Warningf("%s:watchof%vendedwith:%v",r.name,


r.expectedType,err)

}

returnnil

}

}

}

 

上一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十)


下一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)