(4) 具体处理 WorkerQueue中对象的流程,见代码清单 2-57
func(c*Controller)runWorker(){
//启动⽆限循环,接收并处理消息
forc.processNextItem(){
}
}
//从Workqueue中获取对象,并打印信息。
func(c*Controller)processNextItem()bool{key,shutdown:=c.queue.Get()
//退出
ifshutdown{
returnfalse
}
//标记此Key已经处理
deferc.queue.Done(key)
//打印Key对应的Object的信息
err:=c.syncToStdout(key.(string))c.handleError(err,key)
returntrue
}
//获取Key对应的Object,并打印相关信息
func(c*Controller)syncToStdout(keystring)error{obj,exists,err:=c.indexer.GetByKey(key)
iferr!=nil{
klog.Errorf("Fetchingobjectwithkey%sfromstorefailedwith%v",key,err)
returnerr
}
if!exists{
fmt.Printf("Pod%sdoesnotexistanymore\n",key)
}else{
fmt.Printf("Sync/Add/UpdateforPod%s\n",obj.(*core_v1.Pod).
GetName())
}
returnnil
}
(5) Main 函数逻辑,见代码清单2-58
funcmain(){
varkubeconfigstring
varmasterstring
//从外部获取集群信息 (kube.config)
flag.StringVar(&kubeconfig,"kubeconfig","","kubeconfigfile")
//获取集群master的url
flag.StringVar(&master,"master","","masterurl")
//读取构建 config
config,err:=clientcmd.BuildConfigFromFlags(master,kubeconfig)
iferr!=nil{
klog.Fatal(err)
}
//创建k8sClient
clientset,err:=kubernetes.NewForConfig(config)
iferr!=nil{
klog.Fatal(err)
}
//从指定的客户端、资源、命名空间和字段选择器创建⼀个新的List-Watch
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),"pods",v1.NamespaceDefault,fields.Everything())
//构造⼀个具有速率限制排队功能的新的Workqueue
queue:=workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
//创建Indexer和Informer
indexer,informer:=cache.NewIndexerInformer(podListWatcher,&v1.Pod{},
0,cache.ResourceEventHandlerFuncs{
//当有Pod创建时,根据DeltaQueue弹出的Object⽣成对应的Key,并加⼊Workqueue中。此处可以根据Object的⼀些属性进⾏过滤
AddFunc:func(objinterface{}){
key,err:=cache.MetaNamespaceKeyFunc(new)
iferr==nil{queue.Add(key)
}
},
//Pod删除操作
DeleteFunc:func(objinterface{}){
//在⽣成Key之前检查对象。因为资源删除后有可能会进⾏重建等操作,如果监听时错过了删除信息,会导致该条记录是陈旧的
key,err:=cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
iferr==nil{queue.Add(key)
}
},
},cache.Indexers{})
//创建新的Controller
controller:=NewController(queue,indexer,informer)stop:=make(chanstruct{})
deferclose(stop)
//启动 Controller
gocontroller.Run(1,stop)select{}
}
至此一个简单的Controller就完成了,然后我们从已有的k8s环境中复制Config文件,将 Config文件存放在/root/.kube/目录下,配置运行代码,运行结果见代码清单 2-59。
I031215:46:38.849495 25524main.go:125]StartingPodcontrollerSync/Add/UpdateforPodcurl-666-6f68d49784-r2gln
Sync/Add/UpdateforPodbusybox
Poddefault/mypoddoesnotexistanymore
结果显示:程序启动了一个 PodController,Controller监听到在 Default命名空间下有两个 Pod:busybox和 curl-666-6f68d49784-r2gln,缓存中的 mypod已经不存在了。