前言
在研发之旅的不断深入中,愈发感到在接到需求后,画出实现的逻辑图是十分必要的。本篇将试图通过对容器平台工作负载的扩缩容/启动/停止/重启需求来验证前面一句。
需求分析
本需求可分为两大类:
1、扩缩容/启动/停止 本质是对Pod的副本数的变更操作
2、重启 本质是对 ReplicaSet的变更操作,不涉及更改Pod副本数
对于第一类还可以细分为两类:
1、扩缩容 需要计算该工作负载的请求计算型资源(CPU&MEMORY)的总和,集群的剩余计算型资源(CPU&MEMORY),得出可扩容的最大数,然后再进行扩容。应该需要清晰的时,即使无法得出可扩容的最大数,依然不影响进行扩容操作
2、停止/启动 也就是将副本数缩小为0和恢复至期望副本数
逻辑图
根据上述需求分析,可得出一下逻辑图:
上图其实不具备过多可道之处,1、生成k8s client 2、查询集群节点是否允许调度 3、查工作负载的请求资源 4、查节点的剩余资源(依赖metrics-server) 5、分别计算出cpu可调度数和memory可调度数,取最小数 6、扩缩容
上图需要主要重启时应做到优雅,无中断
核心代码
调度:
// 获取可调度节点
dbNodes, rows := w.nodeDao.FindList(map[string]interface{}{"cluster_name": w.clusterManager.ClusterName, "schedulable": 1})
if rows == 0 {
return nil, errors.New("err: There are currently no schedulable nodes in the cluster")
}
// 获取工作负载的请求资源
WorkloadRep , err := w.cli.FindOne(namespace,name)
if err != nil {
return nil,err
}
var useMemValue,useCPUValue,useCPU,useMemory ,totalCPU,totalMem float64
var schedulerNumber int
for _, container := range WorkloadRep.Containers {
useMemValue += tool.CpuAndMem(container.Resources.Requests.Memory().String(), string(container.Resources.Requests.Memory().Format), string(container.Resources.Requests.Memory().Value()))
useCPUValue += tool.CpuAndMem(container.Resources.Requests.Cpu().String(), string(container.Resources.Requests.Cpu().Format), string(container.Resources.Requests.Cpu().Value()))
}
// 获取集群可调度节点剩余资源
for _, dbNode := range dbNodes{
totalCPU += dbNode.TotalCPU
totalMem += dbNode.TotalMemory
metricsNode, err := w.nodeKubernetes.MetricsNode(w.clusterManager.Metrics,dbNode.NodeName)
if err == nil {
useCPU = tool.CPUAndMemStringToFloat64(metricsNode.Usage.Cpu().String())
useMemory = tool.CPUAndMemStringToFloat64(metricsNode.Usage.Memory().String())
} else {
continue
}
}
if useCPU == 0 || useMemory == 0 {
return nil, errors.New("err: metrics-server error")
}
// 计算可调度数
schedulerNumberCpu := (totalCPU * 1000- useCPU ) / useCPUValue
schedulerNumberMem := (totalMem * 1024 - useMemory ) / useMemValue
if schedulerNumberCpu > schedulerNumberMem {
schedulerNumber = int(schedulerNumberMem)
} else {
schedulerNumber = int(schedulerNumberCpu)
}
return schedulerNumber, err
重启
// 生成k8s client
clusterManager, err := buildKubernetesClient(c)
if err != nil {
ErrorResponse(c, ecode.KubernetesBuildClientFail, "err :buildKubernetesClient failed")
return
}
namespace := c.Params.ByName("ns")
kind := c.Params.ByName("kind")
kind = tool.FirstUpper(kind)
name := c.Params.ByName("name")
// 更改annotations 重启时间
patchdata := map[string]interface{}{
"spec": map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
"restartTime": time.Now().Format(time.Stamp),
},
},
},
},
}
body , _ := json.Marshal(patchdata)
if data, err := service.NewWorkloadService(clusterManager, kind).Patch(namespace, name,body); err != nil {
ErrorResponse(c, ecode.PatchLabelFail, err.Error())
} else {
SuccessResponse(c, data)
}