三、Informer 使用示例
在实际的开发工作中,Informer 主要用在两处:
- 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用;
- 在一些自定义 controller 中使用,比如 operator 的开发;
1、下面是一个作为 client 的使用示例:
package main import ( "flag" "fmt" "log" "path/filepath" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } // 初始化 client clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Panic(err.Error()) } stopper := make(chan struct{}) defer close(stopper) // 初始化 informer factory := informers.NewSharedInformerFactory(clientset, 0) nodeInformer := factory.Core().V1().Nodes() informer := nodeInformer.Informer() defer runtime.HandleCrash() // 启动 informer,list & watch go factory.Start(stopper) // 从 apiserver 同步资源,即 list if !cache.WaitForCacheSync(stopper, informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } // 使用自定义 handler informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用 DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") }, }) // 创建 lister nodeLister := nodeInformer.Lister() // 从 lister 中获取所有 items nodeList, err := nodeLister.List(labels.Everything()) if err != nil { fmt.Println(err) } fmt.Println("nodelist:", nodeList) <-stopper } func onAdd(obj interface{}) { node := obj.(*corev1.Node) fmt.Println("add a node:", node.Name) }Shared指的是多个 lister 共享同一个cache,而且资源的变化会同时通知到cache和 listers。这个解释和上面图所展示的内容的是一致的,cache我们在Indexer的介绍中已经分析过了,lister 指的就是OnAdd、OnUpdate、OnDelete 这些回调函数背后的对象
2、以下是作为 controller 使用的一个整体工作流程
(1) 创建一个控制器
- 为控制器创建 workqueue
- 创建 informer, 为 informer 添加 callback 函数,创建 lister
(2) 启动控制器
- 启动 informer
- 等待本地 cache sync 完成后, 启动 workers
(3) 当收到变更事件后,执行 callback
- 等待事件触发
- 从事件中获取变更的 Object
- 做一些必要的检查
- 生成 object key,一般是 namespace/name 的形式
- 将 key 放入 workqueue 中
(4) worker loop
- 等待从 workqueue 中获取到 item,一般为 object key
- 用 object key 通过 lister 从本地 cache 中获取到真正的 object 对象
- 做一些检查
- 执行真正的业务逻辑
- 处理下一个 item
下面是自定义 controller 使用的一个参考:
var ( masterURL string kubeconfig string ) func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") } func main() { flag.Parse() stopCh := signals.SetupSignalHandler() cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { glog.Fatalf("Error building kubeconfig: %s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } // 所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client // informer watch apiserver,每隔 30 秒 resync 一次(list) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30) controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes()) // 启动 informer go kubeInformerFactory.Start(stopCh) // start controller if err = controller.Run(2, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } } // NewController returns a new network controller func NewController( kubeclientset kubernetes.Interface, networkclientset clientset.Interface, networkInformer informers.NetworkInformer) *Controller { // Create event broadcaster // Add sample-controller types to the default Kubernetes Scheme so Events can be // logged for sample-controller types. utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, networkclientset: networkclientset, networksLister: networkInformer.Lister(), networksSynced: networkInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"), recorder: recorder, } glog.Info("Setting up event handlers") // Set up an event handler for when Network resources change networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueNetwork, UpdateFunc: func(old, new interface{}) { oldNetwork := old.(*samplecrdv1.Network) newNetwork := new.(*samplecrdv1.Network) if oldNetwork.ResourceVersion == newNetwork.ResourceVersion { // Periodic resync will send update events for all known Networks. // Two different versions of the same Network will always have different RVs. return } controller.enqueueNetwork(new) }, DeleteFunc: controller.enqueueNetworkForDelete, }) return controller }
自定义controller参考:https://github.com/resouer/k8s-controller-custom-resource