kurbenetes中的informer使用方法

informer用途

获取 kubernetes 中某个资源,同步k8s中的数据到本地缓存,并watch各种资源变化,触发相应的eventHandler.

  1. 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用
  2. 在一些自定义 controller 中使用,比如 operator 的开发

informer定义

informers是一个代码库,实现了一种类似通知的功能,k8s.io/client-go/informers,Informer 是 client-go 中的核心工具包。

informer用法

作为 client 的使用示例

package main

import (
    "flag"
    "fmt"
    "log"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/util/runtime"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // 初始化 client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err.Error())
    }

    stopper := make(chan struct{})
    defer close(stopper)
    
    // 初始化 informer
    factory := informers.NewSharedInformerFactory(clientset, 0)
    nodeInformer := factory.Core().V1().Nodes()
    informer := nodeInformer.Informer()
    defer runtime.HandleCrash()
    
    // 启动 informer,list & watch
    go factory.Start(stopper)
    
    // 从 apiserver 同步资源,必不可少
    if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    // 使用自定义 handler
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
        DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
    })
    
    // 创建 lister
    nodeLister := nodeInformer.Lister()
    // 从 lister 中获取所有 items
    nodeList, err := nodeLister.List(labels.Everything())
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("nodelist:", nodeList)
    <-stopper
}

func onAdd(obj interface{}) {
    node := obj.(*corev1.Node)
    fmt.Println("add a node:", node.Name)
}

另外,还可以使用带消息队列的处理方法。如下所示,监测pod上的annotation标签。如果带宽注解变化则调用接口设置新的带宽:

func main() {
...
    stopChan := make(chan struct{})
    go NewPodInformer().Run(1, stopChan)
...
}

func NewPodInformer() *PodInformer {
	k8sclient := k8smgmt.GetK8sAPIClient()
	if k8sclient == nil {
		log.Fatal("k8s client can not be null")		
	}

	factory := informers.NewSharedInformerFactory(k8sclient.Client, 0)
	podif := &PodInformer{
		kubeClient:       k8sclient,
		informerFactory:       factory,
		informer:      factory.Core().V1().Pods().Informer(),
		lister:        factory.Core().V1().Pods().Lister(),
		listerSynced:  factory.Core().V1().Pods().Informer().HasSynced,
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podUpdateQueue"),
	}

	// use customized handler
	podif.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    podif.addPod,
		UpdateFunc: podif.updatePod,
		DeleteFunc: podif.deletePod,
	})

	podif.syncHandler = podif.syncPod

	// create lister, you can get pod from the lister
	cacher.SetPodLister(podif.lister)
	return podif
}

// Run begins watching and syncing.
func (podinf *PodInformer) Run(workers int, stopCh <-chan struct{}) {
	defer runtime.HandleCrash()
	defer podinf.queue.ShutDown()

	log.Infof("Starting sync pod bandwidth...")
	defer log.Infof("Shutting down sync pod bandwidth")

	go podinf.informerFactory.Start(stopCh)
	if !WaitForCacheSync("pod", stopCh, podinf.listerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		// Why does it not matter if I change the period parameter?
		go wait.Until(podinf.worker, time.Second, stopCh)
	}

	<-stopCh
}

// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
	klog.Infof("Waiting for caches to sync for %s controller", controllerName)

	if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
		runtime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName))
		return false
	}

	klog.Infof("Caches are synced for %s controller", controllerName)
	return true
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (podinf *PodInformer) worker() {
	for podinf.processNextWorkItem() {
	}
}

func (podinf *PodInformer) processNextWorkItem() bool {
	key, quit := podinf.queue.Get()
	if quit {
		return false
	}
	defer podinf.queue.Done(key)

	err := podinf.syncHandler(key.(string))
	if err == nil {
		podinf.queue.Forget(key)
		return true
	}

	runtime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
	podinf.queue.AddRateLimited(key)

	return true
}

// syncPod will sync the Pod with the given key if it has had its expectations fulfilled,
// This function is not meant to be invoked concurrently with the same key.
func (podinf *PodInformer) syncPod(key string) error {
	startTime := time.Now()
	defer func() {
		log.Infof("Finished syncing pod %s bandwidth (%v)", key, time.Since(startTime))
	}()

	namespace, name, err := cacher.SplitPodKey(key)
	if err != nil {
		log.Errorf("fail to get pod key: %v", err)
		return err
	}

	pod, err := podinf.lister.Pods(namespace).Get(name)
	if errors.IsNotFound(err) {
		log.Infof("pod %v has been deleted", key)
		return nil
	}
	if err != nil {
		return err
	}

	// Always updates pod bandwidth as pods come up or die.
	bwinfo, err := cacher.GetPodBandwidth(pod)
	if err != nil {
		return err
	}
	if bwinfo != nil {
		// set new bandwidth...
	}
	return nil
}

func (podinf *PodInformer) updatePod(old interface{}, cur interface{}) {
	if k8sutils.IsLeader() == false {
		return
	}
	enabelBw := os.Getenv(VOYAGE_SERVER_ENABLE_PODLIMIT)
	if strings.ToLower(enabelBw) != "true" {
		return
	}
	curPod := cur.(*v1.Pod)
	oldPod := old.(*v1.Pod)
	if curPod.ResourceVersion == oldPod.ResourceVersion {
		// Periodic resync will send update events for all known pods.
		// Two different versions of the same pod will always have different RVs.
		return
	}

	podid := cacher.CaculateID(curPod.Namespace, curPod.Name)
	bwinfo, err := cacher.GetUpdatedBandwidth(oldPod, curPod)
	if err != nil {
		log.WithField("pod-update-inform", podid).Errorf("fail to handle update pod: %v", err)
		return
	} else if bwinfo != nil {
		log.Infof("enqueue update pod bandwidth event %s: %v", podid, bwinfo)
		podinf.enqueuePod(podid)
	}
	return
}

// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (podinf *PodInformer) enqueuePod(key string) {
	podinf.queue.Add(key)
}

kurbenetes中的informer使用方法

上一篇:解决 webpack .\src\main.js .\dist\bundle.js 错误


下一篇:[模板] 基础树上动规