cmd\kube-controller-manager\app\core.go
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
dsc, err := daemon.NewDaemonSetsController(
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) --->pkg\controller\daemon\daemon_controller.go
return nil, true, nil
}
pkg\controller\daemon\daemon_controller.go
func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
}
}
func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
for dsc.processNextWorkItem(ctx) {
}
}
func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
...
err := dsc.syncHandler(ctx, dsKey.(string)) --->syncDaemonSet
}
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
// 获取目标namespace下的DaemonSet
namespace, name, err := cache.SplitMetaNamespaceKey(key)
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
// 获取所有节点
nodeList, err := dsc.nodeLister.List(labels.Everything())
...
// 如果已经创建或者删除的DaemonSet,那么不要处理
dsKey, err := controller.KeyFunc(ds)
// 如果存在正在被删除的DaemonSet,那么暂时结束此次操作
if ds.DeletionTimestamp != nil {
return nil
}
// 对比新旧DaemonSet
cur, old, err := dsc.constructHistory(ctx, ds)
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
if !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
}
// 管理DaemonSet
err = dsc.manage(ctx, ds, nodeList, hash)
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
}
**manage**
// 找出要运行DaemonSet的具体pod对应节点
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
// 计算所有节点,对该运行pod的节点进行创建,不该运行的进行停止
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// 同步节点信息
if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
**podsShouldBeOnNode**
shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds)
daemonPods, exists := nodeToDaemonPods[node.Name]
switch {
case shouldRun && !exists:
// 如果需要并且不存在则创建
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
case shouldContinueRunning:
// 如果创建失败则删除
...
case !shouldContinueRunning && exists:
// 如果不应该运行但是存在则要删除他
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
podsToDelete = append(podsToDelete, pod.Name)
}
}
return nodesNeedingDaemonPods, podsToDelete
// 确定pod是否该运行,以及是否该继续运行
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
pod := NewPod(ds, node.Name)
// 如果DaemonSet指定了运行节点,则进行节点名对比,不符合则不用运行
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false
}
// 如果节点有污点信息,则要计算pod是否可容忍该污点,不容忍则不用运行
taints := node.Spec.Taints
fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
if !fitsNodeName || !fitsNodeAffinity {
return false, false
}
if !fitsTaints {
// 可容忍污点则继续调度
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
return false, !hasUntoleratedTaint
}
return true, true
}
syncNodes
...
// 批量创建pod
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
podTemplate := template.DeepCopy()
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
}(i)
}
createWait.Wait()
...
// 删除pod
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey)
}
}(i)
}
deleteWait.Wait()