云原生时代,k8s技术成为基础设施标配。这几年经历的公司中,不管是小到十几人的创业小团队,还是几千人的上市公司,基本都可以看到k8s在奔驰。特别是在云厂商的助力下比,省去了k8s运维之后的云原生架构给团队带来的便利性简直是谁用谁爽,如阿里云的ACK、腾讯的TKE、华为云的CCE在使用后都非常不错。但是作为开发人员,天生就会抱着对黑盒子厌恶的特性,因此这里开辟k8s源码解读,帮助大家更清楚它的本质。
k8s 调度器实现原理
k8s 中一个任务的创建流程
k8s 的 scheduler 和 controller manager,kubelet 这些是一样的,都是针对 apiserver 进行控制循环的操作。
- 当我们通过 kubectl 命令创建一个 job 的时候,kube-contoller 检测到资源的创建,并根据参数创建一个 pod 的实例发送给 apiserver。
- kube-scheduler 调度器检测到一个新的未调度 pod,他会从已有的 node 节点选择出一个 node节点绑定这个pod,并向 apiserver 发送一个绑定指令。
- 部署在对应节点上的 kubelet 通过 watchAndList 从 apiserver 检测到这个绑定的指令后,会发送到节点上的 container api,让其节点上运行这个pod。
以上是一个简单job的创建流程为例。这里面的kube-scheduler
调度器就是我们今天带大家了解的k8s基础组件之一 —— k8s的调度器。
kube-scheduler调度器的内部流转流程
通过上面我们知道,kube-scheduler 主要是负责将 pod 绑定到适合的 node 上面,那么 kube-scheduler 是怎么选择适合的 node 节点的呢?
这里提供了一副 kube-scheduler 调度的全景图:
整个事件流程如下:
- Scheduler 通过注册 client-go 的 informer 的 handler 方法监听 api-server 的 pod 和 node 变更事件,从而实现将 pod 的信息更新 scheduler 的 activeQ, podbackoffQ, unschedulableQ 三个队列中。
- 带调度的 pod 会进入到 activeQ 的调度队列中,activeQ 是一个维护着 pod 优先级的堆结构,调度器在调度循环中每次从堆中取出优先级最高的 pod 进行调度。
- 取出的待调度 pod 会经过调度器的一系列调度算法找到合适的 node 节点进行绑定。如果调度算法判定没有适合的节点,会将 pod 更新为不可调度状态,并扔进 unschedulable 的队列中。
- 调度器在执行绑定操作的时候是一个异步过程,调度器会先在缓存中创建一个和原来 pod 一样的 assume pod 对象用模拟完成节点的绑定,如将 assume pod 的 nodename 设置成绑定节点名称,同时通过异步执行绑定指令操作。
- 在 pod 和 node 绑定前,scheduler需要确保 volume 已经完成绑定操作,确认完所有绑定前准备工作,scheduler 会向 api-server 发送一个 bind 对象,对应节点的 kubelet 将待绑定的pod在节点运行起来。
kube-scheduler 源码解析
在源码解读这小节我会把 kube-scheduler分成三部分,第一部分是 scheduleOne,也就是调度器的主线逻辑,第二部分是 Algorithm,也就是调度阶段的核心流程。
本章节源码基于 kuberenesv1.19版本,commit id: 070ff5e3a98bc3ecd596ed62bc456079bcff0290
先对整个 kube-scheduler 的源码解析图和 scheduler 对象有个初步的认识,方便我们后续查阅:
type Scheduler struct {
// cache缓存,用来优化调度器性能的
SchedulerCache internalcache.Cache
// 调度算法
Algorithm ScheduleAlgorithm
// framework扩展
Extenders []framework.Extender
// 获取下一个需要调度的pod
NextPod func() *framework.QueuedPodInfo
Error func(*framework.QueuedPodInfo, error)
// 通过flag停止调度器.
StopEverything <-chan struct{}
// 调度队列
SchedulingQueue internalqueue.SchedulingQueue
// 调度范围,主要用于判断哪些pod能被这个调度器调度
Profiles profile.Map
// client-go,用于和api-server通信
client clientset.Interface
}
上面是调度器的组件,下面我们再看看调度框架包涵哪些:
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
registry Registry
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
// Plugins插入扩展点
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
profileName string
// 用于抢占
preemptHandle framework.PreemptHandle
runAllFilters bool
}
ScheduleOne
基于上面 kube-scheduler 的源码解析图,我们知道 scheduleOne 的流程如下:
- sche.NextPod(),从 scheduleQueue 获取需要调度的 pod
- 通过 pod 的 SchedulerName 判断是否属于这个调度器处理,kube-scheduler 的名字是 default-scheduler,因此 pod 没有专门指定调度器的都会被k8s默认调度器处理。
- 确定属于自己处理后进入调度节点,通过 sched.Algorithm.Schedule 找到当前 pod 最适合的节点,如果没找到适合的节点,调度器会根据 pod 的优先级进行抢占操作。
- 在通过调度算法找到适合的待调度节点之后就是具体调度了,这里 schedule 设计了一个 assume pod 的对象,这个 assume pod 将原来的 pod 对象深度拷贝放入 scheduler cache 中,并设置 nodeName 表示这个节点已被调度,后续的检查和调度基于 assume pod 对象,这样就可以对 pod 进行异步绑定操作而不会有读写锁的问题了。
- 接着 assume pod 会对卷进行 AssumePodVolumes,这一步主要由 RunReservePluginsReserve 方法实现。如果预设操作失败,会进行回滚操作。
- 到 Permit 阶段,这个阶段是在真正调度前对 pod 绑定操作进行最后的批准、拒绝或者执行延时调度。
- 在 Permit 之后,资源的准备评估结束,正式进入第二阶段 pod 的真正绑定周期,整个绑定过程是异步的,放在 go func() 里面。
- 进入异步绑定阶段后,会先通过一个 WaitOnPermit 方法来检查是否延迟调度的,如果有会进行等待。
- 之后会进入 prebind,prebind 主要做 pvc 和 pv 的绑定。
- 完成 prebind 之后就正式进入 bind 操作,scheduler 会向 api-server 发送一个 bind 请求。完成绑定后会执行 postbind,现在这个 plugin 还是一个空的插入点,k8s暂时还没有默认插件。
下面是 ScheduleOne 的源码及注释:
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 从 scheduleQueue 获取需要调度的 pod
podInfo := sched.NextPod()
...
// 通过 pod 的 SchedulerName 判断是否属于这个调度器处理
prof, err := sched.profileForPod(pod)
// 不属于则跳过
if sched.skipPodSchedule(prof, pod) {
return
}
...
// 通过一系列调度算法找到当前 pod 最适合的节点
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
...
// 如果没有适合的节点,pod会进入抢占流程尝试进行抢占
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
if status.IsSuccess() && result != nil {
nominatedNode = result.NominatedNodeName
}
return
}
...
// 复制原来的pod信息,设置一个 assume pod 对原pod进行拷贝设置一份缓存,
// 并设置 assume pod 的 nodeName 来标识 pod 完成调度,
// 后续scheduler的其他Plugin做检查和绑定操作全部基于 assume pod
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
...
// reserve plugins 主要做附属资源的预留,比如在 cache 中
// 完成将预设卷和 pod 进行绑定
if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// 如果失败会进行回滚操作
prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
...
// permit 是对 pod 绑定操作进行最后的批准、拒绝或者执行延时调度
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
...
// 这里开始正式进入 pod 绑定阶段
go func() {
// 通过和 permit 结合完成延时调度
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
...
// prebind 主要准备节点绑定前准备工作,比如PVC和PV绑定,现在默认 prebind 插件只有一个 VolumeBinding
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
...
// 向 api-server 发起节点绑定请求
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
if err != nil {
// 如果失败会进行回滚操作
prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
} else {
// 如果成功后会执行PostBindPlugins, k8s暂时没有默认插件
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
这里面从 RunReservePluginsReserve,RunPermitPlugins,RunPreBindPlugins,RunPreBindPlugins 到 RunPostBindPlugins 都支持用户编写自己的插件扩展 scheduler 调度器。
对照 scheduler framework 官方图解:
详细可以参考kube-scheduler#624提案
我们看看 k8s 的提供的一些默认插件:
func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
// 调度队列排序
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
// 预过滤
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
},
},
// 过滤
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 检查节点是否不可调度
{Name: nodeunschedulable.Name},
// 检查节点的空闲资源(例如,CPU和内存)是否满足 Pod 的要求
{Name: noderesources.FitName},
// 检查 Pod 是否通过主机名指定了 Node
{Name: nodename.Name},
// 检查 Pod 请求的端口(网络协议类型)在节点上是否可用
{Name: nodeports.Name},
// 检查 Pod 的亲和性,是否存在硬亲和
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
// 污浊节点检查
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
// 基于 Pod 的卷的绑定请求,评估 Pod 是否适合节点
{Name: volumebinding.Name},
// 基于 Pod 的卷的zone属性来筛选是否适合节点
{Name: volumezone.Name},
// pod 的拓扑扩展约束判断节点是否适合
{Name: podtopologyspread.Name},
// pod 之间的亲和性
{Name: interpodaffinity.Name},
},
},
// 过滤后
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultpreemption.Name},
},
},
// 预打分
PreScore: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: podtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
// 优先级打分
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 资源平衡
{Name: noderesources.BalancedAllocationName, Weight: 1},
// 偏向已在本地缓存 Pod 所需容器镜像的节点
{Name: imagelocality.Name, Weight: 1},
// 实现了 Pod 间亲和性与反亲和性的优先级
{Name: interpodaffinity.Name, Weight: 1},
// 偏向最少请求资源的节点
{Name: noderesources.LeastAllocatedName, Weight: 1},
// 根据节点亲和中 PreferredDuringSchedulingIgnoredDuringExecution 字段对节点进行优先级排序
{Name: nodeaffinity.Name, Weight: 1},
// 根据节点的注解 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序
{Name: nodepreferavoidpods.Name, Weight: 10000},
// 根据 Pod 拓扑扩展约束的优先级排序
{Name: podtopologyspread.Name, Weight: 2},
// 根据节点上无法忍受的污点数量,给所有节点进行优先级排序
{Name: tainttoleration.Name, Weight: 1},
},
},
// pod预准备
Reserve: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
// 预绑定
PreBind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
// 绑定
Bind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultbinder.Name},
},
},
}
}
Algorithm
Algorithm 是 scheduler 的调度的核心,包涵了一个过滤器和一个打分器,核心逻辑就是把所有适合的节点筛选出来,在再里面找出最优的节点,下面看下 Algorithm 的代码,kube-default 的 algorithm 对象是由一个叫 genericScheduler 的实例实现:
- 先会通过 podPassesBasicChecks 对 pod 做基本检查,比如检查 pod 使用的 pvc 是否存在在命名空间下
- 然后会将 scheduler cache 和 node info 做一次镜像,方便后续对相关数据的使用,这里为啥对 cache 也还要再做一层镜像缓存在后面 scheduler 优化那一小节会讲到.
- 通过 findNodesThatFitPod 方法找出所有适合的节点,再通过 prioritizeNodes 对所有节点进行打分
- 根据打分结果找出最高得分节点返回
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 做基本的检查,主要是检查 pod 的 pvc 在命名空间下是否存在
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
...
// 将 scheduler cache 和 node info 做一次镜像,方面后续使用可以做到无锁
if err := g.snapshot(); err != nil {
return result, err
}
...
// 找到适合 pod 部署的所有节点
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
...
// 将适合的 pod 按照规则进行打分
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
...
// 找出得分最高的节点
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
FeasibleNodes: len(feasibleNodes),
}, err
}
找出适合节点
下面来看看findNodesThatFitPod
方法:
- findNodesThatFitPod 比较简单,包涵三个方法,一个是 prefilter 插入点,一个是 filter 插入点,还有一个是 extender filter。
- prefilter 主要是做一些过滤前的预处理,比如 node port信息, volumebinding 信息等。
- filter 对节点做过滤,找出适合的框架,这里会检查节点的亲和性,资源是否充足,是否存在挂载卷等。
- extender 这个是旧版调度器架构的扩展方式,这里就不累赘,有兴趣的可以自行学习。
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
...
// prefilter 插入点
s := prof.RunPreFilterPlugins(ctx, state, pod)
// filter 插入点
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
// extender,兼容旧的调度框架的 extender 扩展方式
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
return feasibleNodes, filteredNodesStatuses, nil
}
// node过滤方法
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// filter会通过 numFeasibleNodesToFind 确定要过滤得到的节点数量大小,主要防止大集群并有大量符合的节点
// 导致性能问题,当集群过大的时候 numFeasibleNodesToFind 会根据集群规模确定一个比例
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
// Create feasible list with enough space to avoid growing it
// and allow assigning.
feasibleNodes := make([]*v1.Node, numNodesToFind)
// 确保是否存在过滤插件
if !prof.HasFilterPlugins() {
...
}
checkNode := func(i int) {
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
// PodPassesFiltersOnNode 方法会调用 filter plugin 对 node 进行处理获得适合的节点
fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo)
...
}
// 对所有节点并行执行过滤
parallelize.Until(ctx, len(allNodes), checkNode)
...
return feasibleNodes, nil
}
// filter plugins 处理
func PodPassesFiltersOnNode(
ctx context.Context,
ph framework.PreemptHandle,
state *framework.CycleState,
pod *v1.Pod,
info *framework.NodeInfo,
) (bool, *framework.Status, error) {
...
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
}
计算适合节点的得分
在通过findNodesThatFitPod
方法获得适合分配的节点后,需要通过prioritizeNodes
方法来打分找到最适合的节点:
- findNodesThatFitPod主要包括 PreScore 和 ScorePlugins两个插入点
- RunScorePlugins 方法会先对节点进行打分,然后再对打分插件的打分进行修正,最后乘以各插件的权重系数就得到各插件打分的最终分数
- 最后再将各种插件打分的结果汇总得到节点的总分表
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
prof *profile.Profile,
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
// Run PreScore plugins.
preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes)
...
// Run the Score plugins.
scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes)
...
// 将各种插件打分的结果汇总
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}
}
...
return result, nil
}
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
...
// 基于 node 并行执行打分插件,给每个节点进行打分
pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
}
parallelize.Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
Name: nodeName,
Score: int64(s),
}
}
})
...
// 运行 Normalize 对打分进行修正
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
// NormalizeScore
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
....
})
// 每个插件的打分结果乘以插件的权重值
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
for i, nodeScore := range nodeScoreList {
...
nodeScoreList[i].Score = nodeScore.Score * int64(weight)
}
})
...
return pluginToNodeScores, nil
}
func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeScoreList framework.NodeScoreList) *framework.Status {
...
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
return status
}
ScorePlugins打分插件包括:
// 优先级打分
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// 资源平衡
{Name: noderesources.BalancedAllocationName, Weight: 1},
// 偏向已在本地缓存 Pod 所需容器镜像的节点
{Name: imagelocality.Name, Weight: 1},
// 实现了 Pod 间亲和性与反亲和性的优先级
{Name: interpodaffinity.Name, Weight: 1},
// 偏向最少请求资源的节点
{Name: noderesources.LeastAllocatedName, Weight: 1},
// 根据节点亲和中 PreferredDuringSchedulingIgnoredDuringExecution 字段对节点进行优先级排序
{Name: nodeaffinity.Name, Weight: 1},
// 根据节点的注解 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序
{Name: nodepreferavoidpods.Name, Weight: 10000},
// 根据节点上无法忍受的污点数量,给所有节点进行优先级排序
{Name: podtopologyspread.Name, Weight: 2},
// 根据 Pod 拓扑扩展约束的优先级排序
{Name: tainttoleration.Name, Weight: 1},
},
},
小结
k8s调度器从1.15开始由 extension 模式改成了 framework 的架构,kube-scheduler整个代码架构提供了更灵活性定制化能力,可以在原架构上满足了更灵活定制化的需求,而不需要重新 fork 一份源码来修改。