获取 kubernetes 中某个资源,同步k8s中的数据到本地缓存,并watch各种资源变化,触发相应的eventHandler.
- 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用
- 在一些自定义 controller 中使用,比如 operator 的开发
informers是一个代码库,实现了一种类似通知的功能,,Informer 是 client-go 中的核心工具包。
作为 client 的使用示例
package main
import (
corev1 ""
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 启动 informer,list & watch
go factory.Start(stopper)
// 从 apiserver 同步资源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
// 使用自定义 handler
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
// 创建 lister
nodeLister := nodeInformer.Lister()
// 从 lister 中获取所有 items
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
fmt.Println("nodelist:", nodeList)
func onAdd(obj interface{}) {
node := obj.(*corev1.Node)
fmt.Println("add a node:", node.Name)
func main() {
stopChan := make(chan struct{})
go NewPodInformer().Run(1, stopChan)
func NewPodInformer() *PodInformer {
k8sclient := k8smgmt.GetK8sAPIClient()
if k8sclient == nil {
log.Fatal("k8s client can not be null")
factory := informers.NewSharedInformerFactory(k8sclient.Client, 0)
podif := &PodInformer{
kubeClient: k8sclient,
informerFactory: factory,
informer: factory.Core().V1().Pods().Informer(),
lister: factory.Core().V1().Pods().Lister(),
listerSynced: factory.Core().V1().Pods().Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podUpdateQueue"),
// use customized handler
AddFunc: podif.addPod,
UpdateFunc: podif.updatePod,
DeleteFunc: podif.deletePod,
podif.syncHandler = podif.syncPod
// create lister, you can get pod from the lister
return podif
// Run begins watching and syncing.
func (podinf *PodInformer) Run(workers int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer podinf.queue.ShutDown()
log.Infof("Starting sync pod bandwidth...")
defer log.Infof("Shutting down sync pod bandwidth")
go podinf.informerFactory.Start(stopCh)
if !WaitForCacheSync("pod", stopCh, podinf.listerSynced) {
for i := 0; i < workers; i++ {
// Why does it not matter if I change the period parameter?
go wait.Until(podinf.worker, time.Second, stopCh)
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
runtime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName))
return false
klog.Infof("Caches are synced for %s controller", controllerName)
return true
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (podinf *PodInformer) worker() {
for podinf.processNextWorkItem() {
func (podinf *PodInformer) processNextWorkItem() bool {
key, quit := podinf.queue.Get()
if quit {
return false
defer podinf.queue.Done(key)
err := podinf.syncHandler(key.(string))
if err == nil {
return true
runtime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
return true
// syncPod will sync the Pod with the given key if it has had its expectations fulfilled,
// This function is not meant to be invoked concurrently with the same key.
func (podinf *PodInformer) syncPod(key string) error {
startTime := time.Now()
defer func() {
log.Infof("Finished syncing pod %s bandwidth (%v)", key, time.Since(startTime))
namespace, name, err := cacher.SplitPodKey(key)
if err != nil {
log.Errorf("fail to get pod key: %v", err)
return err
pod, err := podinf.lister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
log.Infof("pod %v has been deleted", key)
return nil
if err != nil {
return err
// Always updates pod bandwidth as pods come up or die.
bwinfo, err := cacher.GetPodBandwidth(pod)
if err != nil {
return err
if bwinfo != nil {
// set new bandwidth...
return nil
func (podinf *PodInformer) updatePod(old interface{}, cur interface{}) {
if k8sutils.IsLeader() == false {
if strings.ToLower(enabelBw) != "true" {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
podid := cacher.CaculateID(curPod.Namespace, curPod.Name)
bwinfo, err := cacher.GetUpdatedBandwidth(oldPod, curPod)
if err != nil {
log.WithField("pod-update-inform", podid).Errorf("fail to handle update pod: %v", err)
} else if bwinfo != nil {
log.Infof("enqueue update pod bandwidth event %s: %v", podid, bwinfo)
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (podinf *PodInformer) enqueuePod(key string) {