Kube Scheduler 源码分析

Kube Scheduler 源码分析

Kube Scheduler 是负责k8s 集群最终Pod 应该部署到哪台机器的决策者,本章来走读一下Scheduler 基本流程源码。

func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }

Scheduler 也是一个command line ,启动参数和另外几个组件无异。

Scheduler Run 函数

// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        cc.InformerFactory.Storage().V1().StorageClasses(),
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
    if err != nil {
        return err
    }

    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

可以看到Run 函数主要做如下几个流程

  • 根据需要观察的Informer 创建Scheduler 实例
  • 将所有的Informer Listener 运行起来
  • 使用Client-go 进行选主
  • 运行真正的调度 run 函数

scheduler.New 函数

// New returns a Scheduler
func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}
  • 将所有的Informer 构建为一个Config Struct
  • 根据Provider 或者 ConfigMap 来创建对应的调度器以及初始化策略(这里可以选择使用config file 或者configMap 模式来指定各种调度策略顺序和权重以及是否开启)
  • 将返回的config 构建成对应的schedu 实例返回

CreateFromConfig

这里我们以Policy 为例看一下具体如何根据config file 创建对应的Scheduler Struct。

// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from configuration: %v", policy)

    // validate the policy configuration
    if err := validation.ValidatePolicy(policy); err != nil {
        return nil, err
    }

    predicateKeys := sets.NewString()
    if policy.Predicates == nil {
        klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        predicateKeys = provider.FitPredicateKeys
    } else {
        for _, predicate := range policy.Predicates {
            klog.V(2).Infof("Registering predicate: %s", predicate.Name)
            predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
        }
    }

    priorityKeys := sets.NewString()
    if policy.Priorities == nil {
        klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        priorityKeys = provider.PriorityFunctionKeys
    } else {
        for _, priority := range policy.Priorities {
            klog.V(2).Infof("Registering priority: %s", priority.Name)
            priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
        }
    }

    var extenders []algorithm.SchedulerExtender
    if len(policy.ExtenderConfigs) != 0 {
        ignoredExtendedResources := sets.NewString()
        for ii := range policy.ExtenderConfigs {
            klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
            extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
            if err != nil {
                return nil, err
            }
            extenders = append(extenders, extender)
            for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
                if r.IgnoredByScheduler {
                    ignoredExtendedResources.Insert(string(r.Name))
                }
            }
        }
        predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
    }
  • 如果没有设置Predicates则初始化default Predicates
  • 如果没有设置Priorities 则初始化default Priorities
  • 如果设置ExtenderConfigs 则初始化ExtenderConfigs

下面列一下都有哪些Predicates

// MatchInterPodAffinityPred defines the name of predicate MatchInterPodAffinity.
    MatchInterPodAffinityPred = "MatchInterPodAffinity"
    // CheckVolumeBindingPred defines the name of predicate CheckVolumeBinding.
    CheckVolumeBindingPred = "CheckVolumeBinding"
    // CheckNodeConditionPred defines the name of predicate CheckNodeCondition.
    CheckNodeConditionPred = "CheckNodeCondition"
    // GeneralPred defines the name of predicate GeneralPredicates.
    GeneralPred = "GeneralPredicates"
    // HostNamePred defines the name of predicate HostName.
    HostNamePred = "HostName"
    // PodFitsHostPortsPred defines the name of predicate PodFitsHostPorts.
    PodFitsHostPortsPred = "PodFitsHostPorts"
    // MatchNodeSelectorPred defines the name of predicate MatchNodeSelector.
    MatchNodeSelectorPred = "MatchNodeSelector"
    // PodFitsResourcesPred defines the name of predicate PodFitsResources.
    PodFitsResourcesPred = "PodFitsResources"
    // NoDiskConflictPred defines the name of predicate NoDiskConflict.
    NoDiskConflictPred = "NoDiskConflict"
    // PodToleratesNodeTaintsPred defines the name of predicate PodToleratesNodeTaints.
    PodToleratesNodeTaintsPred = "PodToleratesNodeTaints"
    // CheckNodeUnschedulablePred defines the name of predicate CheckNodeUnschedulablePredicate.
    CheckNodeUnschedulablePred = "CheckNodeUnschedulable"
    // PodToleratesNodeNoExecuteTaintsPred defines the name of predicate PodToleratesNodeNoExecuteTaints.
    PodToleratesNodeNoExecuteTaintsPred = "PodToleratesNodeNoExecuteTaints"
    // CheckNodeLabelPresencePred defines the name of predicate CheckNodeLabelPresence.
    CheckNodeLabelPresencePred = "CheckNodeLabelPresence"
    // CheckServiceAffinityPred defines the name of predicate checkServiceAffinity.
    CheckServiceAffinityPred = "CheckServiceAffinity"
    // MaxEBSVolumeCountPred defines the name of predicate MaxEBSVolumeCount.
    MaxEBSVolumeCountPred = "MaxEBSVolumeCount"
    // MaxGCEPDVolumeCountPred defines the name of predicate MaxGCEPDVolumeCount.
    MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
    // MaxAzureDiskVolumeCountPred defines the name of predicate MaxAzureDiskVolumeCount.
    MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
    // MaxCinderVolumeCountPred defines the name of predicate MaxCinderDiskVolumeCount.
    MaxCinderVolumeCountPred = "MaxCinderVolumeCount"
    // MaxCSIVolumeCountPred defines the predicate that decides how many CSI volumes should be attached
    MaxCSIVolumeCountPred = "MaxCSIVolumeCountPred"
    // NoVolumeZoneConflictPred defines the name of predicate NoVolumeZoneConflict.
    NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
    // CheckNodeMemoryPressurePred defines the name of predicate CheckNodeMemoryPressure.
    CheckNodeMemoryPressurePred = "CheckNodeMemoryPressure"
    // CheckNodeDiskPressurePred defines the name of predicate CheckNodeDiskPressure.
    CheckNodeDiskPressurePred = "CheckNodeDiskPressure"
    // CheckNodePIDPressurePred defines the name of predicate CheckNodePIDPressure.
    CheckNodePIDPressurePred = "CheckNodePIDPressure"

    // DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
    // GCE instances can have up to 16 PD volumes attached.
    DefaultMaxGCEPDVolumes = 16
    // DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure
    // Larger Azure VMs can actually have much more disks attached.
    // TODO We should determine the max based on VM size
    DefaultMaxAzureDiskVolumes = 16

    // KubeMaxPDVols defines the maximum number of PD Volumes per kubelet
    KubeMaxPDVols = "KUBE_MAX_PD_VOLS"

    // EBSVolumeFilterType defines the filter name for EBSVolumeFilter.
    EBSVolumeFilterType = "EBS"
    // GCEPDVolumeFilterType defines the filter name for GCEPDVolumeFilter.
    GCEPDVolumeFilterType = "GCE"
    // AzureDiskVolumeFilterType defines the filter name for AzureDiskVolumeFilter.
    AzureDiskVolumeFilterType = "AzureDisk"
    // CinderVolumeFilterType defines the filter name for CinderVolumeFilter.
    CinderVolumeFilterType = "Cinder"

所有的priorities如下

EqualPriority = "EqualPriority"
    // MostRequestedPriority defines the name of prioritizer function that gives used nodes higher priority.
    MostRequestedPriority = "MostRequestedPriority"
    // RequestedToCapacityRatioPriority defines the name of RequestedToCapacityRatioPriority.
    RequestedToCapacityRatioPriority = "RequestedToCapacityRatioPriority"
    // SelectorSpreadPriority defines the name of prioritizer function that spreads pods by minimizing
    // the number of pods (belonging to the same service or replication controller) on the same node.
    SelectorSpreadPriority = "SelectorSpreadPriority"
    // ServiceSpreadingPriority is largely replaced by "SelectorSpreadPriority".
    ServiceSpreadingPriority = "ServiceSpreadingPriority"
    // InterPodAffinityPriority defines the name of prioritizer function that decides which pods should or
    // should not be placed in the same topological domain as some other pods.
    InterPodAffinityPriority = "InterPodAffinityPriority"
    // LeastRequestedPriority defines the name of prioritizer function that prioritize nodes by least
    // requested utilization.
    LeastRequestedPriority = "LeastRequestedPriority"
    // BalancedResourceAllocation defines the name of prioritizer function that prioritizes nodes
    // to help achieve balanced resource usage.
    BalancedResourceAllocation = "BalancedResourceAllocation"
    // NodePreferAvoidPodsPriority defines the name of prioritizer function that priorities nodes according to
    // the node annotation "scheduler.alpha.kubernetes.io/preferAvoidPods".
    NodePreferAvoidPodsPriority = "NodePreferAvoidPodsPriority"
    // NodeAffinityPriority defines the name of prioritizer function that prioritizes nodes which have labels
    // matching NodeAffinity.
    NodeAffinityPriority = "NodeAffinityPriority"
    // TaintTolerationPriority defines the name of prioritizer function that prioritizes nodes that marked
    // with taint which pod can tolerate.
    TaintTolerationPriority = "TaintTolerationPriority"
    // ImageLocalityPriority defines the name of prioritizer function that prioritizes nodes that have images
    // requested by the pod present.
    ImageLocalityPriority = "ImageLocalityPriority"
    // ResourceLimitsPriority defines the nodes of prioritizer function ResourceLimitsPriority.
    ResourceLimitsPriority = "ResourceLimitsPriority"

scheduleOne

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    plugins := sched.config.PluginSet
    // Remove all plugin context data at the beginning of a scheduling cycle.
    if plugins.Data().Ctx != nil {
        plugins.Data().Ctx.Reset()
    }

    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    if pod == nil {
        return
    }
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    scheduleResult, err := sched.schedule(pod)
    err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })

这里就是最终调度Pod的最终函数,可以看到,它先从队列里面得到所有的未调度的 Pod,这里判断是否未调度主要根据Pod 的NodeName 是否已经被分配为标准,后面我们再介绍这个队列是怎么运作的。

取得Pod Info 后,依次运行刚才注册的预选与优选调度策略,最终选择出来合适的Node,然后调用Client go 的Bind 方法,将Pod 绑定到Node 上完成调度动作。

调度Pod Queue 构成

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
            },
        },
    )

这里完成了针对Pod 的Watch,同时将未调度的Pod 放入未调度Queue 方便调度器进行调度。可以看到,这里同样适用的Client go Informer 机制来同步Pod 的信息。

PodFitsHostPorts

由于PreDicates非常多,我们就以检查 Pod 端口冲突为例看一下PreDicates 函数

// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
    var wantPorts []*v1.ContainerPort
    if predicateMeta, ok := meta.(*predicateMetadata); ok {
        wantPorts = predicateMeta.podPorts
    } else {
        // We couldn't parse metadata - fallback to computing it.
        wantPorts = schedutil.GetContainerPorts(pod)
    }
    if len(wantPorts) == 0 {
        return true, nil, nil
    }

    existingPorts := nodeInfo.UsedPorts()

    // try to see whether existingPorts and  wantPorts will conflict or not
    if portsConflict(existingPorts, wantPorts) {
        return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
    }

    return true, nil, nil
}
  • 根据传入的Pod info 查看Pod 需要暴露的端口
  • 根据Node info 判断当前Node 可用端口
  • 判断是否被占用并返回结果

ImageLocalityPriority

func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
    node := nodeInfo.Node()
    if node == nil {
        return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
    }

    var score int
    if priorityMeta, ok := meta.(*priorityMetadata); ok {
        score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
    } else {
        // if we are not able to parse priority meta data, skip this priority
        score = 0
    }

    return schedulerapi.HostPriority{
        Host:  node.Name,
        Score: score,
    }, nil
}

Priority我们以镜像为例,可以看到,先根据Pod info 来查看需要哪些镜像,然后根据Node info 查看当前镜像是否存在,存在的越多打分越高。

上一篇:中国煤炭行业全面推进大数据的发展运用


下一篇:SQL Server Mobile 和 .NET 数据访问接口之间的数据类型映射