// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to // items. See also the comment on DeltaFIFO. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f }
// Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn‘t already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) } // Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) } // Delete is just like Add, but makes a Deleted Delta. If the given // object does not already exist, it will be ignored. (It may have // already been deleted by a Replace (re-list), for example.) In this // method `f.knownObjects`, if not nil, provides (via GetByKey) // _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. // Don‘t provide a second report of the same deletion. return nil } } else { // We only want to skip the "deletion" action if the object doesn‘t // exist in knownObjects and it doesn‘t have corresponding item in items. // Note that even if there is a "deletion" action in items, we can ignore it, // because it will be deduped automatically in "queueActionLocked" _, exists, err := f.knownObjects.GetByKey(id) _, itemsExist := f.items[id] if err == nil && !exists && !itemsExist { // Presumably, this was deleted when a relist happened. // Don‘t provide a second report of the same deletion. return nil } } // exist in items and/or KnownObjects return f.queueActionLocked(Deleted, obj) }
root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# cat main.go package main import ( "fmt" "github.com/spongeprojects/magicconch" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "time" ) // newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储 func newStore() cache.Store { return cache.NewStore(cache.MetaNamespaceKeyFunc) } // newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列, // 注意在初始化时 store 作为 KnownObjects 参数传入其中, // 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态, // 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。 // 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。 func newQueue(store cache.Store) cache.Queue { return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ KnownObjects: store, EmitDeltaTypeReplaced: true, }) } // newConfigMapsReflector 用于创建一个 cache.Reflector 对象, // 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。 func newConfigMapsReflector(queue cache.Queue) *cache.Reflector { lw := newConfigMapsListerWatcher() // 前面有说明 // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件, // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用; // 第 4 个参数是 resyncPeriod, // 这里传了 0,表示从不重新同步(除非连接超时或者中断), // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致, // 同步过程中会产生 SYNC 类型的事件。 return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0) } func main() { fmt.Println("----- 2-reflector -----") store := newStore() queue := newQueue(store) reflector := newConfigMapsReflector(queue) stopCh := make(chan struct{}) defer close(stopCh) // reflector 开始运行后,队列中就会推入新收到的事件 go reflector.Run(stopCh) // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作, // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。 processObj := func(obj interface{}) error { // 最先收到的事件会被最先处理 for _, d := range obj.(cache.Deltas) { switch d.Type { case cache.Sync, cache.Replaced, cache.Added, cache.Updated: if _, exists, err := store.Get(d.Object); err == nil && exists { if err := store.Update(d.Object); err != nil { return err } } else { if err := store.Add(d.Object); err != nil { return err } } case cache.Deleted: if err := store.Delete(d.Object); err != nil { return err } } configMap, ok := d.Object.(*corev1.ConfigMap) if !ok { return fmt.Errorf("not config: %T", d.Object) } fmt.Printf("%s: %s\n", d.Type, configMap.Name) } return nil } fmt.Println("Start syncing...") // 持续运行直到 stopCh 关闭 wait.Until(func() { for { _, err := queue.Pop(processObj) magicconch.Must(err) } }, time.Second, stopCh) }
wget http://localhost:8001/api/v1/tmp/configmaps/watch kubectl create configmap -n tmp demo kubectl create namespace tmp kubectl delete configmaps -n tmp demo
root@ubuntu:~# kubectl create configmap -n tmp demo configmap/demo created root@ubuntu:~# kubectl create configmap -n tmp demo2 configmap/demo2 created root@ubuntu:~# kubectl delete configmap -n tmp demo2 configmap "demo2" deleted root@ubuntu:~# kubectl delete configmap -n tmp demo configmap "demo" deleted root@ubuntu:~#
root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# ./main --kubeconfig=$HOME/.kube/config ----- 2-reflector ----- Start syncing... Added: demo Added: demo2 Deleted: demo2 Deleted: demo