DaemonSetController

cmd\kube-controller-manager\app\core.go
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	dsc, err := daemon.NewDaemonSetsController(
		controllerContext.InformerFactory.Apps().V1().DaemonSets(),
		controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.InformerFactory.Core().V1().Nodes(),
		controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
		flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
	}
	go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) --->pkg\controller\daemon\daemon_controller.go
	return nil, true, nil
}

pkg\controller\daemon\daemon_controller.go

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
	}
}


func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
	for dsc.processNextWorkItem(ctx) {
	}
}


func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
	...
	err := dsc.syncHandler(ctx, dsKey.(string)) --->syncDaemonSet
}


func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
	// 获取目标namespace下的DaemonSet
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
	// 获取所有节点
	nodeList, err := dsc.nodeLister.List(labels.Everything())
	...
	// 如果已经创建或者删除的DaemonSet,那么不要处理
	dsKey, err := controller.KeyFunc(ds)
	// 如果存在正在被删除的DaemonSet,那么暂时结束此次操作
	if ds.DeletionTimestamp != nil {
		return nil
	}

	// 对比新旧DaemonSet
	cur, old, err := dsc.constructHistory(ctx, ds)
	hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
	if !dsc.expectations.SatisfiedExpectations(dsKey) {
		// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
		return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
	}
	// 管理DaemonSet
	err = dsc.manage(ctx, ds, nodeList, hash)

	// Process rolling updates if we're ready.
	if dsc.expectations.SatisfiedExpectations(dsKey) {
		switch ds.Spec.UpdateStrategy.Type {
		case apps.OnDeleteDaemonSetStrategyType:
		case apps.RollingUpdateDaemonSetStrategyType:
			err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
		}
		if err != nil {
			return err
		}
	}

	err = dsc.cleanupHistory(ctx, ds, old)
	if err != nil {
		return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
	}

	return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
}


**manage**
	// 找出要运行DaemonSet的具体pod对应节点
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
	// 计算所有节点,对该运行pod的节点进行创建,不该运行的进行停止
	var nodesNeedingDaemonPods, podsToDelete []string
	for _, node := range nodeList {
		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
			node, nodeToDaemonPods, ds, hash)
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
	}

	podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
	// 同步节点信息
	if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
		return err
	}


**podsShouldBeOnNode**
	shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds)
	daemonPods, exists := nodeToDaemonPods[node.Name]

	switch {
	case shouldRun && !exists:
		// 如果需要并且不存在则创建
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
	case shouldContinueRunning:
		// 如果创建失败则删除
		...
	case !shouldContinueRunning && exists:
		// 如果不应该运行但是存在则要删除他
		for _, pod := range daemonPods {
			if pod.DeletionTimestamp != nil {
				continue
			}
			podsToDelete = append(podsToDelete, pod.Name)
		}
	}
	return nodesNeedingDaemonPods, podsToDelete


// 确定pod是否该运行,以及是否该继续运行
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
	pod := NewPod(ds, node.Name)

	// 如果DaemonSet指定了运行节点,则进行节点名对比,不符合则不用运行
	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
		return false, false
	}
	
	// 如果节点有污点信息,则要计算pod是否可容忍该污点,不容忍则不用运行
	taints := node.Spec.Taints
	fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
	if !fitsNodeName || !fitsNodeAffinity {
		return false, false
	}

	if !fitsTaints {
		// 可容忍污点则继续调度
		_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
			return t.Effect == v1.TaintEffectNoExecute
		})
		return false, !hasUntoleratedTaint
	}

	return true, true
}


syncNodes
	...
	// 批量创建pod
	createWait.Add(batchSize)
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()

				podTemplate := template.DeepCopy()
				podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
					podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
				err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
					ds, metav1.NewControllerRef(ds, controllerKind))
			}(i)
		}
		createWait.Wait()
	...
	// 删除pod
	deleteWait := sync.WaitGroup{}
	deleteWait.Add(deleteDiff)
	for i := 0; i < deleteDiff; i++ {
		go func(ix int) {
			defer deleteWait.Done()
			if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
				dsc.expectations.DeletionObserved(dsKey)
			}
		}(i)
	}
	deleteWait.Wait()
上一篇:Java发送Http请求


下一篇:Promethues查看pod状态使用命令