DeltaFIFO reflector

 

 

// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {
        opts.KeyFunction = MetaNamespaceKeyFunc
    }

    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,

        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
    }
    f.cond.L = &f.lock
    return f
}

 

 

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn‘t already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

// Delete is just like Add, but makes a Deleted Delta. If the given
// object does not already exist, it will be ignored. (It may have
// already been deleted by a Replace (re-list), for example.)  In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            // Presumably, this was deleted when a relist happened.
            // Don‘t provide a second report of the same deletion.
            return nil
        }
    } else {
        // We only want to skip the "deletion" action if the object doesn‘t
        // exist in knownObjects and it doesn‘t have corresponding item in items.
        // Note that even if there is a "deletion" action in items, we can ignore it,
        // because it will be deduped automatically in "queueActionLocked"
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // Presumably, this was deleted when a relist happened.
            // Don‘t provide a second report of the same deletion.
            return nil
        }
    }

    // exist in items and/or KnownObjects
    return f.queueActionLocked(Deleted, obj)
}

 

 

root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# cat main.go 
package main

import (
        "fmt"
        "github.com/spongeprojects/magicconch"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/wait"
        "k8s.io/client-go/tools/cache"
        "time"
)

// newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
func newStore() cache.Store {
        return cache.NewStore(cache.MetaNamespaceKeyFunc)
}

// newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
// 注意在初始化时 store 作为 KnownObjects 参数传入其中,
// 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
// 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
// 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
func newQueue(store cache.Store) cache.Queue {
        return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
                KnownObjects:          store,
                EmitDeltaTypeReplaced: true,
        })
}

// newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
// 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
        lw := newConfigMapsListerWatcher() // 前面有说明
        // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
        // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
        // 第 4 个参数是 resyncPeriod,
        // 这里传了 0,表示从不重新同步(除非连接超时或者中断),
        // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
        // 同步过程中会产生 SYNC 类型的事件。
        return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
}

func main() {
        fmt.Println("----- 2-reflector -----")

        store := newStore()
        queue := newQueue(store)
        reflector := newConfigMapsReflector(queue)

        stopCh := make(chan struct{})
        defer close(stopCh)

        // reflector 开始运行后,队列中就会推入新收到的事件
        go reflector.Run(stopCh)

        // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
        // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
        processObj := func(obj interface{}) error {
                // 最先收到的事件会被最先处理
                for _, d := range obj.(cache.Deltas) {
                        switch d.Type {
                        case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                                if _, exists, err := store.Get(d.Object); err == nil && exists {
                                        if err := store.Update(d.Object); err != nil {
                                                return err
                                        }
                                } else {
                                        if err := store.Add(d.Object); err != nil {
                                                return err
                                        }
                                }
                        case cache.Deleted:
                                if err := store.Delete(d.Object); err != nil {
                                        return err
                                }
                        }
                        configMap, ok := d.Object.(*corev1.ConfigMap)
                        if !ok {
                                return fmt.Errorf("not config: %T", d.Object)
                        }
                        fmt.Printf("%s: %s\n", d.Type, configMap.Name)
                }
                return nil
        }

        fmt.Println("Start syncing...")

        // 持续运行直到 stopCh 关闭
        wait.Until(func() {
                for {
                        _, err := queue.Pop(processObj)
                        magicconch.Must(err)
                }
        }, time.Second, stopCh)
}

 

wget http://localhost:8001/api/v1/tmp/configmaps/watch
kubectl create configmap -n tmp demo
kubectl create namespace tmp
kubectl delete configmaps -n tmp demo

 

 

root@ubuntu:~# kubectl create configmap -n tmp demo
configmap/demo created
root@ubuntu:~# kubectl create configmap -n tmp demo2
configmap/demo2 created
root@ubuntu:~# kubectl delete configmap -n tmp demo2
configmap "demo2" deleted
root@ubuntu:~# kubectl delete configmap -n tmp demo
configmap "demo" deleted
root@ubuntu:~# 

 

 

root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# ./main --kubeconfig=$HOME/.kube/config 
----- 2-reflector -----
Start syncing...
Added: demo
Added: demo2
Deleted: demo2
Deleted: demo

 

Kubernetes Informer 源码解析与深度使用 [1/4]: cache 包源码解析与 Informer 的使用

DeltaFIFO reflector

上一篇:pandas读取Excel


下一篇:【题解】[HAOI2015]树上染色