kubernetes源码分析controller-manager启动

kube controller manager 代码分析

1.目录: cmd/kube-controller-manager

kubernetes源码分析controller-manager启动

2.main函数定义在cmd/kube-controller-manager/controller-manager.go

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewControllerManagerCommand()

	// TODO: once we switch everything over to Cobra commands, we can go back to calling
	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

步骤: rand 生成随机数 --> 初始化command --> init log --> execute

  1. NewControllerManagerCommand 定义在app目录下

    cmd/kube-controller-manager/app/controllermanager.go

    源码定义:

    func NewControllerManagerCommand() *cobra.Command {
    	s, err := options.NewKubeControllerManagerOptions()
    	if err != nil {
    		klog.Fatalf("unable to initialize command options: %v", err)
    	}
    
    	cmd := &cobra.Command{
    		Use: "kube-controller-manager",
    		Long: `The Kubernetes controller manager is a daemon that embeds
    the core control loops shipped with Kubernetes. In applications of robotics and
    automation, a control loop is a non-terminating loop that regulates the state of
    the system. In Kubernetes, a controller is a control loop that watches the shared
    state of the cluster through the apiserver and makes changes attempting to move the
    current state towards the desired state. Examples of controllers that ship with
    Kubernetes today are the replication controller, endpoints controller, namespace
    controller, and serviceaccounts controller.`,
    		PersistentPreRunE: func(*cobra.Command, []string) error {
    			// silence client-go warnings.
    			// kube-controller-manager generically watches APIs (including deprecated ones),
    			// and CI ensures it works properly against matching kube-apiserver versions.
    			restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
    			return nil
    		},
    		Run: func(cmd *cobra.Command, args []string) {
    			verflag.PrintAndExitIfRequested()
    			cliflag.PrintFlags(cmd.Flags())
    
    			c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
    			if err != nil {
    				fmt.Fprintf(os.Stderr, "%v\n", err)
    				os.Exit(1)
    			}
    
    			if err := Run(c.Complete(), wait.NeverStop); err != nil {
    				fmt.Fprintf(os.Stderr, "%v\n", err)
    				os.Exit(1)
    			}
    		},
    		Args: func(cmd *cobra.Command, args []string) error {
    			for _, arg := range args {
    				if len(arg) > 0 {
    					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    				}
    			}
    			return nil
    		},
    	}
    
    	fs := cmd.Flags()
    	namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
    	verflag.AddFlags(namedFlagSets.FlagSet("global"))
    	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
    	registerLegacyGlobalFlags(namedFlagSets)
    	for _, f := range namedFlagSets.FlagSets {
    		fs.AddFlagSet(f)
    	}
    	usageFmt := "Usage:\n  %s\n"
    	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
    	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
    		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
    		cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
    		return nil
    	})
    	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
    		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
    		cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    	})
    
    	return cmd
    }
    

    1).NewControllerManagerCommand 返回 cobra.Command对象

    2).

    cmd := &cobra.Command{
    		Use: "kube-controller-manager",
    		...
    		}
    

    定义command信息

    3). 定义options

    s, err := options.NewKubeControllerManagerOptions()
    

    4.NewKubeControllerManagerOptions定义在cmd/kube-controller-manager/app/options/options.go

    
    // KubeControllerManagerOptions is the main context object for the kube-controller manager.
    type KubeControllerManagerOptions struct {
    	Generic           *cmoptions.GenericControllerManagerConfigurationOptions
    	KubeCloudShared   *cpoptions.KubeCloudSharedOptions
    	ServiceController *cpoptions.ServiceControllerOptions
    
    	AttachDetachController           *AttachDetachControllerOptions
    	CSRSigningController             *CSRSigningControllerOptions
    	DaemonSetController              *DaemonSetControllerOptions
    	DeploymentController             *DeploymentControllerOptions
    	StatefulSetController            *StatefulSetControllerOptions
    	DeprecatedFlags                  *DeprecatedControllerOptions
    	EndpointController               *EndpointControllerOptions
    	EndpointSliceController          *EndpointSliceControllerOptions
    	EndpointSliceMirroringController *EndpointSliceMirroringControllerOptions
    	GarbageCollectorController       *GarbageCollectorControllerOptions
    	HPAController                    *HPAControllerOptions
    	JobController                    *JobControllerOptions
    	CronJobController                *CronJobControllerOptions
    	NamespaceController              *NamespaceControllerOptions
    	NodeIPAMController               *NodeIPAMControllerOptions
    	NodeLifecycleController          *NodeLifecycleControllerOptions
    	PersistentVolumeBinderController *PersistentVolumeBinderControllerOptions
    	PodGCController                  *PodGCControllerOptions
    	ReplicaSetController             *ReplicaSetControllerOptions
    	ReplicationController            *ReplicationControllerOptions
    	ResourceQuotaController          *ResourceQuotaControllerOptions
    	SAController                     *SAControllerOptions
    	TTLAfterFinishedController       *TTLAfterFinishedControllerOptions
    
    	SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
    	// TODO: remove insecure serving mode
    	InsecureServing *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback
    	Authentication  *apiserveroptions.DelegatingAuthenticationOptions
    	Authorization   *apiserveroptions.DelegatingAuthorizationOptions
    	Metrics         *metrics.Options
    	Logs            *logs.Options
    
    	Master                      string
    	Kubeconfig                  string
    	ShowHiddenMetricsForVersion string
    }
    

    可以查看到manager管理的controller, 例如namspace, rc, deployemt等

    1. 回到run

      // Run runs the KubeControllerManagerOptions.  This should never exit.
      func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
      	// To help debugging, immediately log version
      	klog.Infof("Version: %+v", version.Get())
      
      	if cfgz, err := configz.New(ConfigzName); err == nil {
      		cfgz.Set(c.ComponentConfig)
      	} else {
      		klog.Errorf("unable to register configz: %v", err)
      	}
      
      	// Setup any healthz checks we will want to use.
      	var checks []healthz.HealthChecker
      	var electionChecker *leaderelection.HealthzAdaptor
      	if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
      		electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
      		checks = append(checks, electionChecker)
      	}
      
      	// Start the controller manager HTTP server
      	// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
      	var unsecuredMux *mux.PathRecorderMux
      	if c.SecureServing != nil {
      		unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
      		handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
      		// TODO: handle stoppedCh returned by c.SecureServing.Serve
      		if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
      			return err
      		}
      	}
      	if c.InsecureServing != nil {
      		unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
      		insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
      		handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
      		if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
      			return err
      		}
      	}
      
      	run := func(ctx context.Context) {
      		rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
      			ClientConfig: c.Kubeconfig,
      		}
      		var clientBuilder clientbuilder.ControllerClientBuilder
      		if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
      			if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
      				// It's possible another controller process is creating the tokens for us.
      				// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
      				klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
      			}
      
      			if shouldTurnOnDynamicClient(c.Client) {
      				klog.V(1).Infof("using dynamic client builder")
      				//Dynamic builder will use TokenRequest feature and refresh service account token periodically
      				clientBuilder = controller.NewDynamicClientBuilder(
      					restclient.AnonymousClientConfig(c.Kubeconfig),
      					c.Client.CoreV1(),
      					"kube-system")
      			} else {
      				klog.V(1).Infof("using legacy client builder")
      				clientBuilder = clientbuilder.SAControllerClientBuilder{
      					ClientConfig:         restclient.AnonymousClientConfig(c.Kubeconfig),
      					CoreClient:           c.Client.CoreV1(),
      					AuthenticationClient: c.Client.AuthenticationV1(),
      					Namespace:            "kube-system",
      				}
      			}
      		} else {
      			clientBuilder = rootClientBuilder
      		}
      		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
      		if err != nil {
      			klog.Fatalf("error building controller context: %v", err)
      		}
      		saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
      
      		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
      			klog.Fatalf("error starting controllers: %v", err)
      		}
      
      		controllerContext.InformerFactory.Start(controllerContext.Stop)
      		controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
      		close(controllerContext.InformersStarted)
      
      		select {}
      	}
      
      	if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
      		run(context.TODO())
      		panic("unreachable")
      	}
      
      	id, err := os.Hostname()
      	if err != nil {
      		return err
      	}
      
      	// add a uniquifier so that two processes on the same host don't accidentally both become active
      	id = id + "_" + string(uuid.NewUUID())
      
      	rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
      		c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
      		c.ComponentConfig.Generic.LeaderElection.ResourceName,
      		c.LeaderElectionClient.CoreV1(),
      		c.LeaderElectionClient.CoordinationV1(),
      		resourcelock.ResourceLockConfig{
      			Identity:      id,
      			EventRecorder: c.EventRecorder,
      		})
      	if err != nil {
      		klog.Fatalf("error creating lock: %v", err)
      	}
      
      	leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
      		Lock:          rl,
      		LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
      		RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
      		RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
      		Callbacks: leaderelection.LeaderCallbacks{
      			OnStartedLeading: run,
      			OnStoppedLeading: func() {
      				klog.Fatalf("leaderelection lost")
      			},
      		},
      		WatchDog: electionChecker,
      		Name:     "kube-controller-manager",
      	})
      	panic("unreachable")
      }
      

      var unsecuredMux *mux.PathRecorderMux 定义mux锁

    2. 启动信息:

      		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
      		if err != nil {
      			klog.Fatalf("error building controller context: %v", err)
      		}
      		saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
      
      		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
      			klog.Fatalf("error starting controllers: %v", err)
      		}
      
      		controllerContext.InformerFactory.Start(controllerContext.Stop)
      		controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
      

    NewControllerInitializers 中定义了所有的 controller 以及 start controller 对应的方法。deployment controller 对应的启动方法是 startDeploymentController

    查看下CreateControllerContext 定义:

 func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
 	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
 	sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
 
 	metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
 	metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())
 
 	// If apiserver is not running we should wait for some time and fail only then. This is particularly
 	// important when we start apiserver and controller manager at the same time.
 	if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
 		return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
 	}
 
 	// Use a discovery client capable of being refreshed.
 	discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
 	cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
 	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
 	go wait.Until(func() {
 		restMapper.Reset()
 	}, 30*time.Second, stop)
 
 	availableResources, err := GetAvailableResources(rootClientBuilder)
 	if err != nil {
 		return ControllerContext{}, err
 	}
 
 	cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
 		s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
 	if err != nil {
 		return ControllerContext{}, err
 	}
 
 	ctx := ControllerContext{
 		ClientBuilder:                   clientBuilder,
 		InformerFactory:                 sharedInformers,
 		ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
 		ComponentConfig:                 s.ComponentConfig,
 		RESTMapper:                      restMapper,
 		AvailableResources:              availableResources,
 		Cloud:                           cloud,
 		LoopMode:                        loopMode,
 		Stop:                            stop,
 		InformersStarted:                make(chan struct{}),
 		ResyncPeriod:                    ResyncPeriod(s),
  }
 	return ctx, nil
}
  1. NewControllerInitializers 对 deploymentController 进行初始化
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["endpointslice"] = startEndpointSliceController
	controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["csrsigning"] = startCSRSigningController
	controllers["csrapproving"] = startCSRApprovingController
	controllers["csrcleaner"] = startCSRCleanerController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	controllers["nodeipam"] = startNodeIpamController
	controllers["nodelifecycle"] = startNodeLifecycleController
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
		// TODO: volume controller into the IncludeCloudLoops only set.
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher
	controllers["ephemeral-volume"] = startEphemeralVolumeController
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
		utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
		controllers["storage-version-gc"] = startStorageVersionGCController
	}

	return controllers
}

在调用后会执行run方法

  1. 这里选择startDeploymentController进行启动分析, 其他controller类似
controllers["deployment"] = startDeploymentController

在cmd/kube-controller-manager/app/apps.go定义:

import (
	"fmt"
	"net/http"
	"time"

	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/util/flowcontrol"
	"k8s.io/kubernetes/pkg/controller/daemon"
	"k8s.io/kubernetes/pkg/controller/deployment"
	"k8s.io/kubernetes/pkg/controller/replicaset"
	"k8s.io/kubernetes/pkg/controller/statefulset"
)


func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
		return nil, false, nil
	}
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

调用dc.run运行

ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs 定义的默认值在pkg/controller/deployment/config/v1alpha1/defaults.go,

k8s.io/kubernetes/pkg/controller/deployment 实际通过软链接指向pkg/controller/deployment

  1. NewDeploymentController 在文件pkg/controller/deployment/deployment_controller.go
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartStructuredLogging(0)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

	if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
		if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
			return nil, err
		}
	}
	dc := &DeploymentController{
		client:        client,
		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	dc.rsControl = controller.RealRSControl{
		KubeClient: client,
		Recorder:   dc.eventRecorder,
	}

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: dc.deleteDeployment,
	})
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addReplicaSet,
		UpdateFunc: dc.updateReplicaSet,
		DeleteFunc: dc.deleteReplicaSet,
	})
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: dc.deletePod,
	})

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	dc.dLister = dInformer.Lister()
	dc.rsLister = rsInformer.Lister()
	dc.podLister = podInformer.Lister()
	dc.dListerSynced = dInformer.Informer().HasSynced
	dc.rsListerSynced = rsInformer.Informer().HasSynced
	dc.podListerSynced = podInformer.Informer().HasSynced
	return dc, nil
}

创建eventBroadcaster并设置属性
设置速率控制RateLimiter
创建DeploymentController对象, 设置client, event, queue等
AddEventHandler 设置事件回调
设置handler处理函数为syncDeployment
配置listener等

  1. run函数
```
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer dc.queue.ShutDown()

	klog.Infof("Starting deployment controller")
	defer klog.Infof("Shutting down deployment controller")

	if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

	<-stopCh
}
```
  1. work定义:

    // worker runs a worker thread that just dequeues items, processes them, and marks them done.
    // It enforces that the syncHandler is never invoked concurrently with the same key.
    func (dc *DeploymentController) worker() {
    	for dc.processNextWorkItem() {
    	}
    }
    
    func (dc *DeploymentController) processNextWorkItem() bool {
    	key, quit := dc.queue.Get()
    	if quit {
    		return false
    	}
    	defer dc.queue.Done(key)
    
    	err := dc.syncHandler(key.(string))
    	dc.handleErr(err, key)
    
    	return true
    }
    
  2. err := dc.syncHandler(key.(string)) 定义:

```
err := dc.syncHandler(key.(string))
```

实际上由之前的 NewDeploymentController定义指向 `dc.syncHandler = dc.syncDeployment`
  1. syncDeployment 定义如下:
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
	startTime := time.Now()
	klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
	defer func() {
		klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
	}()

	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.V(2).Infof("Deployment %v has been deleted", key)
		return nil
	}
	if err != nil {
		return err
	}

	// Deep-copy otherwise we are mutating our cache.
	// TODO: Deep-copy only when needed.
	d := deployment.DeepCopy()

	// 检查lableseletor
	
	everything := metav1.LabelSelector{}
	if reflect.DeepEqual(d.Spec.Selector, &everything) {
		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
		if d.Status.ObservedGeneration < d.Generation {
			d.Status.ObservedGeneration = d.Generation
			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
		}
		return nil
	}

	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
	// through adoption/orphaning.
	
	// 获取deployemt对应的rs 列表
	
	rsList, err := dc.getReplicaSetsForDeployment(d)
	if err != nil {
		return err
	}
	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
	// Current uses of the podMap are:
	//
	// * check if a Pod is labeled correctly with the pod-template-hash label.
	// * check that no old Pods are running in the middle of Recreate Deployments.
	
	// 获取pod列表
	podMap, err := dc.getPodMapForDeployment(d, rsList)
	if err != nil {
		return err
	}

// 判断是否删除, 若删除更新状态
	if d.DeletionTimestamp != nil {
		return dc.syncStatusOnly(d, rsList)
	}

	// Update deployment conditions with an Unknown condition when pausing/resuming
	// a deployment. In this way, we can be sure that we won't timeout when a user
	// resumes a Deployment with a set progressDeadlineSeconds.
	
	// 检查是否为pasuse状态
	if err = dc.checkPausedConditions(d); err != nil {
		return err
	}

	if d.Spec.Paused {
		return dc.sync(d, rsList)
	}

	// rollback is not re-entrant in case the underlying replica sets are updated with a new
	// revision so we should ensure that we won't proceed to update replica sets until we
	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
	
	// 检查是否为rollback回滚操作
	if getRollbackTo(d) != nil {
		return dc.rollback(d, rsList)
	}
	
//检查是否scale调整
	scalingEvent, err := dc.isScalingEvent(d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(d, rsList)
	}

// 更新
	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

deployemt事件优先级为: delete > pause > rollback > scale > rollout

添加对应event的处理函数, 例如dc.rollback

上一篇:Chart.js


下一篇:zephyr (1)