带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十七)

(4)  具体处理 WorkerQueue中对象的流程,见代码清单 2-57

func(c*Controller)runWorker(){

//启动⽆限循环,接收并处理消息

forc.processNextItem(){

}

}

//Workqueue中获取对象,并打印信息。

func(c*Controller)processNextItem()bool{key,shutdown:=c.queue.Get()

//退出

ifshutdown{

returnfalse

}

//标记此Key已经处理

deferc.queue.Done(key)

//打印Key对应的Object的信息

err:=c.syncToStdout(key.(string))c.handleError(err,key)

returntrue

}

 

//获取Key对应的Object,并打印相关信息

func(c*Controller)syncToStdout(keystring)error{obj,exists,err:=c.indexer.GetByKey(key)

iferr!=nil{

klog.Errorf("Fetchingobjectwithkey%sfromstorefailedwith%v",key,err)

returnerr

}

if!exists{

fmt.Printf("Pod%sdoesnotexistanymore\n",key)

}else{

fmt.Printf("Sync/Add/UpdateforPod%s\n",obj.(*core_v1.Pod).

GetName())

}


 

returnnil


}

 

(5)  Main 函数逻辑,见代码清单2-58

funcmain(){

varkubeconfigstring

varmasterstring

//从外部获取集群信息 (kube.config)

flag.StringVar(&kubeconfig,"kubeconfig","","kubeconfigfile")

//获取集群masterurl

flag.StringVar(&master,"master","","masterurl")

//读取构建 config

config,err:=clientcmd.BuildConfigFromFlags(master,kubeconfig)

iferr!=nil{

klog.Fatal(err)

}

//创建k8sClient

clientset,err:=kubernetes.NewForConfig(config)

iferr!=nil{

klog.Fatal(err)

}

//从指定的客户端、资源、命名空间和字段选择器创建⼀个新的List-Watch

podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),"pods",v1.NamespaceDefault,fields.Everything())

//构造⼀个具有速率限制排队功能的新的Workqueue

queue:=workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

//创建IndexerInformer

indexer,informer:=cache.NewIndexerInformer(podListWatcher,&v1.Pod{},

0,cache.ResourceEventHandlerFuncs{

//当有Pod创建时,根据DeltaQueue弹出的Object⽣成对应的Key,并加⼊Workqueue中。此处可以根据Object的⼀些属性进⾏过滤

AddFunc:func(objinterface{}){

key,err:=cache.MetaNamespaceKeyFunc(new)

iferr==nil{queue.Add(key)

}

},

//Pod删除操作

DeleteFunc:func(objinterface{}){

//在⽣成Key之前检查对象。因为资源删除后有可能会进⾏重建等操作,如果监听时错过了删除信息,会导致该条记录是陈旧的

key,err:=cache.DeletionHandlingMetaNamespaceKeyFunc(obj)


 

 

iferr==nil{queue.Add(key)

}

},

},cache.Indexers{})

//创建新的Controller

controller:=NewController(queue,indexer,informer)stop:=make(chanstruct{})

deferclose(stop)

//启动 Controller

gocontroller.Run(1,stop)select{}

}

 

至此一个简单的Controller就完成了,然后我们从已有的k8s环境中复制Config文件,将 Config文件存放在/root/.kube/目录下,配置运行代码,运行结果见代码清单 2-59。

I031215:46:38.849495     25524main.go:125]StartingPodcontrollerSync/Add/UpdateforPodcurl-666-6f68d49784-r2gln

Sync/Add/UpdateforPodbusybox

Poddefault/mypoddoesnotexistanymore

 

结果显示:程序启动了一个 PodController,Controller监听到在 Default命名空间下有两个 Pod:busybox和 curl-666-6f68d49784-r2gln,缓存中的 mypod已经不存在了。

上一篇:Android系统匿名共享内存(Anonymous Shared Memory)C++调用接口分析(1)


下一篇:带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)