informer用途
获取 kubernetes 中某个资源,同步k8s中的数据到本地缓存,并watch各种资源变化,触发相应的eventHandler.
- 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用
- 在一些自定义 controller 中使用,比如 operator 的开发
informer定义
informers是一个代码库,实现了一种类似通知的功能,k8s.io/client-go/informers,Informer 是 client-go 中的核心工具包。
informer用法
作为 client 的使用示例
package main
import (
"flag"
"fmt"
"log"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 启动 informer,list & watch
go factory.Start(stopper)
// 从 apiserver 同步资源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
// 创建 lister
nodeLister := nodeInformer.Lister()
// 从 lister 中获取所有 items
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
fmt.Println(err)
}
fmt.Println("nodelist:", nodeList)
<-stopper
}
func onAdd(obj interface{}) {
node := obj.(*corev1.Node)
fmt.Println("add a node:", node.Name)
}
另外,还可以使用带消息队列的处理方法。如下所示,监测pod上的annotation标签。如果带宽注解变化则调用接口设置新的带宽:
func main() {
...
stopChan := make(chan struct{})
go NewPodInformer().Run(1, stopChan)
...
}
func NewPodInformer() *PodInformer {
k8sclient := k8smgmt.GetK8sAPIClient()
if k8sclient == nil {
log.Fatal("k8s client can not be null")
}
factory := informers.NewSharedInformerFactory(k8sclient.Client, 0)
podif := &PodInformer{
kubeClient: k8sclient,
informerFactory: factory,
informer: factory.Core().V1().Pods().Informer(),
lister: factory.Core().V1().Pods().Lister(),
listerSynced: factory.Core().V1().Pods().Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podUpdateQueue"),
}
// use customized handler
podif.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: podif.addPod,
UpdateFunc: podif.updatePod,
DeleteFunc: podif.deletePod,
})
podif.syncHandler = podif.syncPod
// create lister, you can get pod from the lister
cacher.SetPodLister(podif.lister)
return podif
}
// Run begins watching and syncing.
func (podinf *PodInformer) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer podinf.queue.ShutDown()
log.Infof("Starting sync pod bandwidth...")
defer log.Infof("Shutting down sync pod bandwidth")
go podinf.informerFactory.Start(stopCh)
if !WaitForCacheSync("pod", stopCh, podinf.listerSynced) {
return
}
for i := 0; i < workers; i++ {
// Why does it not matter if I change the period parameter?
go wait.Until(podinf.worker, time.Second, stopCh)
}
<-stopCh
}
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
runtime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName))
return false
}
klog.Infof("Caches are synced for %s controller", controllerName)
return true
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (podinf *PodInformer) worker() {
for podinf.processNextWorkItem() {
}
}
func (podinf *PodInformer) processNextWorkItem() bool {
key, quit := podinf.queue.Get()
if quit {
return false
}
defer podinf.queue.Done(key)
err := podinf.syncHandler(key.(string))
if err == nil {
podinf.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
podinf.queue.AddRateLimited(key)
return true
}
// syncPod will sync the Pod with the given key if it has had its expectations fulfilled,
// This function is not meant to be invoked concurrently with the same key.
func (podinf *PodInformer) syncPod(key string) error {
startTime := time.Now()
defer func() {
log.Infof("Finished syncing pod %s bandwidth (%v)", key, time.Since(startTime))
}()
namespace, name, err := cacher.SplitPodKey(key)
if err != nil {
log.Errorf("fail to get pod key: %v", err)
return err
}
pod, err := podinf.lister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
log.Infof("pod %v has been deleted", key)
return nil
}
if err != nil {
return err
}
// Always updates pod bandwidth as pods come up or die.
bwinfo, err := cacher.GetPodBandwidth(pod)
if err != nil {
return err
}
if bwinfo != nil {
// set new bandwidth...
}
return nil
}
func (podinf *PodInformer) updatePod(old interface{}, cur interface{}) {
if k8sutils.IsLeader() == false {
return
}
enabelBw := os.Getenv(VOYAGE_SERVER_ENABLE_PODLIMIT)
if strings.ToLower(enabelBw) != "true" {
return
}
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
podid := cacher.CaculateID(curPod.Namespace, curPod.Name)
bwinfo, err := cacher.GetUpdatedBandwidth(oldPod, curPod)
if err != nil {
log.WithField("pod-update-inform", podid).Errorf("fail to handle update pod: %v", err)
return
} else if bwinfo != nil {
log.Infof("enqueue update pod bandwidth event %s: %v", podid, bwinfo)
podinf.enqueuePod(podid)
}
return
}
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (podinf *PodInformer) enqueuePod(key string) {
podinf.queue.Add(key)
}