基于K8s调度器实现自定义调度

背景:为了实现基于K8s的数据库服务的调度功能   难点:  1,原生K8s的资源只有cpu和mem,但是MySQL调度需要考虑磁盘资源,             2,原生调度策略不符合线上环境,比如线上容器和物理机存在混跑,服务存在定制策略等   方案: 1,基于K8s 调度器的源码进行修改,定制化调度器,所有服务器调度时指定新调度器实现自定义策略             2,将需要的元数据,比如MySQL端口容量,服务器磁盘容量等信息通过脚本同步到K8s中的annotations中   流程:             1,下载调度器源码至本地服务器
 可从官方下载,此处原为内部git地址,省去
            2,本地修改完成后,编译,
CGO_ENABLED=0 GOOS=linux GOARCH=amd64  make all WHAT=cmd/kube-scheduler/
            3,编译完成&&没有报错,打包images 并 push到仓库
docker build -f Dockerfile -t registry.xxxx.xxxx.com.cn/xxxx/scheduler:1.0 .
docker login registry.xxx.xxx.com.cn -u xxxx
docker push registry.xxxx.xxxx.com.cn/xxxx/scheduler
            4,在南北K8s集群上找到运行着的 scheduler,并delete,scheduler以deployment方式部署,delete完成后确认下新scheduler是否running即可
kubectl get pod -n kube-system | grep kube-scheduler-db
kubectl delete pod kube-scheduler-db-xxxxxx-xxxxxxx -n kube-system
            5,至此 scheduler 更新流程流程,之后可以观察修改生效情况,重大更新建议先测试再正式上线   修改的细节(举例)             scheduler 的目录结构如下      基于K8s调度器实现自定义调度             我们需要修改的部分集中在\kubernetes-develop\pkg\scheduler\algorithm目录下,   该目录下有两个文件夹,这两个目录下每一个文件代表着一种策略 predicates 此目录下为初选策略,初选即为过滤,通过此策略将不满足硬性标准的服务器淘汰 priorities 此目录下为优选策略,优选即为打分,通过此策略将所有通过初选的服务器打分,选取分数最高服务器进行调度   predicates 中以xxxx_disk_predicate.go 为例,此策略过滤不满足MySQL磁盘空间要求的服务器,实际代码以线上为准
package predicates
 
import (
       "fmt"
       "k8s.io/api/core/v1"
       "k8s.io/kubernetes/pkg/scheduler/algorithm"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func NevisDiskPredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 
       var remainDiskPercentage int64 = 10 //disk最少剩余10%
       var requestedDisk int64 //node总的已request的值
       var HoldDiskMin int64 = 250 //剩余容量保底值
       node := nodeInfo.Node() //这里获得的是正在进行判断的node实例
 
       if node == nil {  // node无效就直接退出
              return false, nil, fmt.Errorf("node not found")
       }
       
       //获取服务类型 关于磁盘调度目前只对MySQL和pika生效,所以需要判断服务器的资源池
       scheduleService := pod.Annotations[AnnotationSchedulerService]
 
       requestDiskCurrentPod := NevisPodDISKRequest(pod) // 当前POD的disk request
       for _, p := range nodeInfo.Pods() { // 获取node上每个pod的diskrequest之和 requestedDISK
              requestDiskPerPod := xxxxPodDISKRequest(p)
              requestedDisk += requestDiskPerPod
       }
 
 
       nodeFreeDisk := xxxxNodeDISKFree(node) //获取node的free disk  
       nodeTotalDisk := xxxxNodeDISKTotal(node) //获取node的total disk
        //NevisNodeDISKFree 和 NevisNodeDISKTotal 函数在utils.go里,具体的是获取node的annotations数据,
        //这里需要注意线上的annotations 值的各种状态都需要考虑,缺省值需要设定,否则会影响之后的判断
 
       // 过滤策略
       // 1,已经request 加 即将request的 要小于 totaldisk的90%,
       // 2,freedisk 减 即将request的 要大于 totaldisk的10%
       // 3,考虑到有些服务器磁盘容量很小(<4T),增加最低保留空间 为 250G
        // 策略需要注意线上是容器和物理机混跑状态,需要考虑的全面一些,
 
       HoldDisk := int64(nodeTotalDisk * remainDiskPercentage/100)
       if HoldDisk < HoldDiskMin {
              HoldDisk = HoldDiskMin
       }
 
       if ((scheduleService == SchedulerServiceMySQL) || (scheduleService == SchedulerServicePika)){
        //进行资源池判断,不满足条件的服务器直接过滤
        // 关于服务类型的判断应该放在函数的入口,以避免无效的运算,这里是举例就不修改了
              if  (int64(requestedDisk) + int64(requestDiskCurrentPod) > int64(nodeTotalDisk - HoldDisk)|| ((nodeFreeDisk - int64(requestDiskCurrentPod)) < HoldDisk)) {
                     return false, []algorithm.PredicateFailureReason{
                            &PredicateFailureError{
                                   PredicateName: "xxxxDiskPredicate",
                                   PredicateDesc: fmt.Sprintf("node doesn't has enough DISK request %d, requested %d", requestDiskCurrentPod, requestedDISK),
                            },
                     }, nil
              }
              return true, nil, nil
       }
       return true, nil, nil
}
 
func init() {
       predicates := Ordering()
       predicates = append(predicates, NevisDiskPred)
       SetPredicatesOrdering(predicates)
}

 

priorities 中以more_mem.go 为例,此策略将给有更多空闲内存的服务器打高分,实际代码以线上为准
package priorities
 
import (
       "fmt"
       "strconv"
       "k8s.io/klog"
       "k8s.io/api/core/v1"
       "k8s.io/apimachinery/pkg/labels"
       v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
       schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
       schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
 
func CalculateNodeMoreMemMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
 
 
       node := nodeInfo.Node() 
       if node == nil {
              return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
       }
 
       var allMemory int64 //服务器的总内存
       var usedMemory int64 //服务器已使用内存
       allMemory = NevisNodeMemAll(node)
 
        //此处是通过node的10255端口获取node的实例mem使用量,讲道理逻辑里应该尽量不适用这种调用,因为可能拖慢调度的速度,因为这是所有过初选的node都需要执行的
                // 但是由于数据库本身不会存在非常频繁的调度,而且实现非常简单,所以如此实现了
       resp, err := (&http.Client{Timeout: 5 * time.Second}).Get("http://" + node.Status.Addresses[0].Address + ":10255/stats")
              if err != nil {
                     klog.Warningf("get node status failed: %s", err)
              } else {
                     var ci cv1.ContainerInfo
                     if err := json.NewDecoder(resp.Body).Decode(&ci); err != nil {
                            resp.Body.Close()
                            klog.Warningf("decode node status failed: %s", err)
                     } else {
                            usedMemory = int64(ci.Stats[0].Memory.RSS)
                     }
              }
 
        // 打分策略,如果内存没有使用,即为10分,全部用完为0分,按比例分配
       count := 10 - (usedMemory / allMemory * int64(10))
       // 这里在打log。相关的log可通过 kubectl logs kube-scheduler-db-xxxx-xxxx -n kube-system 查看
       klog.Warningf("message i want", count) 
        //返回打分集合,注意下格式即可
       return schedulerapi.HostPriority{
              Host:  node.Name,
              Score: int(count),
       }, nil
}
 
// 实际去计算打分结果的时候 是采用map - reduce的方式,了解hadoop原理的应该知道,这里不多说,感兴趣可自行了解
var CalculateNodeMoreMemReduce = NormalizeReduce(schedulerapi.MaxPriority, false)

程序中还包含一些工具类,放在utils.go下,此处省略

在写完所有的调度策略后,别急还没完,我们还需要去工厂函数里注册下我们新加的策略 位置在\pkg\scheduler\algorithmprovider\defaults\defaults.go   在 func defaultPredicates() 里增加关于初选策略的函数,如
factory.RegisterFitPredicate(predicates.xxxDiskPred, predicates.xxxDiskPredicate),
  在 func defaultPriorities() 里增加关于优选策略的函数,如
factory.RegisterPriorityFunction2("MoreCpu", priorities.CalculateNodeMoreMemMap, priorities.CalculateNodeMoreMemReduce, 1000000),
在优选策略中需要注意的是传入的最后一个值为权重,此处为1000000,用于设置不同优选策略的权重,一个node的实际分数为所有 优选策略分数 * 权重 后之和 这里设置为如此大的数是为了让此策略成为 最重要的决定值,其余的策略目前我们并不关心,在之后的开发中可以是当修改, 其他自带的优选策略共有6种,除了NodePreferAvoidPodsPriority权重为10000外,其余均为1,这几种优选策略也比较好理解,自行看下代码即可,不说了     最后总结下已经做出的修改 初选            函数名:xxxxDiskPredicate           作用服务: MySQL && pika          目标:过滤所有不满足磁盘空间要求的服务器,         策略:all_request_disk + request_disk <  total_disk * 90%                   free_disk - request_disk > total_disk * 10%             函数名:xxxxMemoryMysqlPredicate         作用服务:MySQL         目标:过滤满足内存要求的服务器         策略: free_mem - request_mem > total_mem * 10%                      函数名:xxxxCPUPredicate         作用服务:所有         目标:过滤不满足 cpu 要求的服务器         策略:free_cpu > request_cpu                  函数名:xxxxMemoryPredicate         作用服务:redis         目标:过滤 不满足 mem 要求的服务器         策略:需要满足redis的降级和非降级两种模式,具体看代码吧,写出来废纸     优选          函数名:CalculateNodeMoreMemMap         作用服务:MySQL         目标:提高 空闲mem多服务器的分数         策略(free-mem / total-mem)* 10 * 权重                        
上一篇:Kubernetes--Pod优先级调度


下一篇:爬虫日记(86):Scrapy的Scheduler类(一)