Swarm简介
Swarm是Docker的一个集群管理工具,最开始仅仅是用作集群管理以及简单的调度,就像下面的图所示的,为Docker Client提供与Docker Engine一样的Docker API,客户端操作Swarm就好像在操作一台机器,实际上后面连了好多Docker Engine,容器都跑在后面的Docker Engine上。Swarm负责将客户端的请求分配到后端的Docker Engine,比如客户端要求启动一个容器,Swarm找一个目前资源使用最少的Docker Engine。
因此早期的Swarm比较底层,有点像虚拟机管理器,抽象程度低。而Kubernetes(Google开源的容器管理工具)抽象了很多概念,比如Service,Pod等,更上层,稍微一封装就可以做成一个PaaS了。为了对抗Kubernetes,Docker也对Swarm做了升级改造,先是SwarmKit,抽象了Service,Task等,然后又把Swarmkit集成到了Docker Engine中,可以使用docker命令把一个结点设为swarm mode,然后这个结点就成了swarm集群的一个结点。因此最新的Swarm不再是一个工具,而是一个集群,我们把集群称作Swarm,然后这个集群里面有manager和worker两种角色,manager中有一个leader,通过Raft算法实现数据一致性。总之很多方面都抄了Kubernetes。
可以在Swarm集群中创建Service,而一个Service有多个实例,比如我创建一个tomcat的Service,名字是tomcat_service,使用的镜像是tomcat,然后创建3个实例,也就是启动3个容器,用下面的命令:
docker service create --name tomcat_service --replicas 3 tomcat
在manager上执行这个命令,manager会把3个容器按调度策略分配到不同的worker上(manager也可以当做worker)。
swarm的调度策略是:在满足constraint的worker中找一个task(也就是容器数)最少的结点,这种策略叫做spread策略,就是尽可能的把task平均分布到不同结点。constraint是指一些必须满足的条件,比如某个task要求分配2G内存。
spread策略是没问题的,但是swarm在调度的时候没有把一项很重要的内容加进去,那就是实例的容灾。
我搭了一个3个结点的Swarm集群,manager1,worker1和worker2,创建了一个hello服务,有4个replica,一个world服务,有2个replica,如下:
看上去3个结点每个结点两个replica,挺好的,但是有一个严重的问题,world服务的两个replica被调度到同一台主机上,那么这台主机挂了,整个服务就挂了。其实replica的概念就是要在多个地方存放,以防止单主机出现问题导致服务不可用。比如HDFS的3个replica一般要放到不同机器上,甚至还要考虑不同机柜,不同机房。 Kubernetes在调度的时候也考虑了多个replica放到多台主机上的策略。 docker的开发人员目前只忙着出产品,没功夫去精雕细琢细节到地方,不过多个replica放到不同主机之后肯定是会加进去的。
本文介绍如何修改Docker 1.12的代码把replica容灾的策略加到Swarm调度策略中。
Swarm调度算法介绍
老的Swarm调度算法可以用下面的图表示:
1.一个调度请求过来,里面包含很多constraint,比如需要分配4G内存,或者要求必须调度上含有production标签的结点上,或者要求某个结点没有被占用。
2.所有的结点作为一个List传到一个filter链里,这个filter会过滤掉不符合条件的结点,比如内存不够,输出一个符合条件的结点List
3.按照策略进行排序,排名最高的就是要调度的结点
策略有三个:
spread: 默认策略,尽量均匀分布,找容器数少的结点调度
binpack: 和spread相反,尽量把一个结点占满再用其他结点
random: 随机
老的Swarm没有replica的概念,每个实例都是独立的个体,所以不需要在调度的时候考虑多副本部署到不同主机。 新的Swarm调度算法和老Swarm差不多,不过不再提供策略选择,只提供了spread策略。
新的Swarm把结点信息放到一个堆里(堆排序的堆),以当前结点上的容器数为建堆的标准建一个最小堆,这样查找起来就特别快了。
代码改造
改造策略
最优解:对于一个task,找到的结点应该不含与这个task属于同一个service的task,同时这个结点在符合这个条件的结点中task数最少。
次优解:所有满足硬性constraint的结点都启动了与这个task属于同一个service的task,只能在这其中找一个task数最少的了。
代码修改
修改两个源文件就可以
修改代码docker/vendor/src/github.com/docker/swarmkit/manager/scheduler/indexed_node_heap.go
1.添加一个函数,定义一个constraint称为multihostConstraint,意思是同一个service的不同副本要落到不同主机上,与其它强制性的constraint不一样,这个是尽量满足的constraint
1
2
3
4
5
6
7
8
9
10
|
//检查某个结点是否已经存在属于同一个service的task func meetMultihosConstraint(nodeInfo *NodeInfo, serviceID string) bool { for _, task := range nodeInfo.Tasks {
sID = task.ServiceID
if sID == serviceID {
return false
}
}
return true
} |
2.修改搜索nodeHeap的函数searchHeapToFindMin,加一个参数serviceID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
func (nh *nodeHeap) searchHeapToFindMin( meetsConstraints func(*NodeInfo) bool,
serviceID string)
(*api.Node, int ) {
var bestNode *api.Node //满足multihostConstraint同时task最少的结点
var secondBestNode *api.Node //没有满足multihostConstraint的,只能选一个task最少的结点
minTasks := int (^uint( 0 ) >> 1 ) // max int
secondMinTasks := minTasks
if nh == nil || len(nh.heap) == 0 {
return bestNode, minTasks
}
// push root to stack for search
stack := [] int { 0 }
for len(stack) != 0 {
// pop an element
idx := stack[len(stack)- 1 ]
stack = stack[ 0 : len(stack)- 1 ]
heapEntry := &nh.heap[idx]
if len(heapEntry.Tasks) >= minTasks {
continue
}
if meetsConstraints(heapEntry) {
//满足强制性constraint,再检查是否满足multihostConstraint
if meetMultihosConstraint(heapEntry, serviceID) == true {
bestNode = heapEntry.Node
minTasks = len(heapEntry.Tasks)
} else {
if (len(heapEntry.Tasks) < secondMinTasks) {
secondBestNode = heapEntry.Node
secondMinTasks = len(heapEntry.Tasks)
}
}
} else {
// otherwise, push 2 children to stack for further search
if 2 *idx+ 1 < len(nh.heap) {
stack = append(stack, 2 *idx+ 1 )
}
if 2 *idx+ 2 < len(nh.heap) {
stack = append(stack, 2 *idx+ 2 )
}
}
}
if bestNode == nil {
bestNode = secondBestNode
minTasks = secondMinTasks
}
return bestNode, minTasks
} |
修改代码docker/vendor/src/github.com/docker/swarmkit/manager/scheduler/scheduler.go里的scheduleTask函数
1
2
3
4
5
6
7
|
// scheduleTask schedules a single task. func (s *Scheduler) scheduleTask(ctx context.Context, t *api.Task) *api.Task { s.pipeline.SetTask(t)
//这个函数直接改成searchHeapToFindMin
//s.scanAllNodes是是否扫描全部结点的标志,直接改成false
//n, _ := s.nodeHeap.findMin(s.pipeline.Process, s.scanAllNodes)
n,_ := s.nodeHeap.searchHeapToFindMin(s.pipeline.Process, false , t.ServiceID)
|