首先简单介绍下自己,之前是做 floodlight 控制器开发的,鉴于 ODL 和 onos 的如火如荼的发展,如果不对了解点就感觉自己 OUT 了,因此忙里偷闲,看了点 onos的源码收获颇丰,不敢私藏,也算是抛砖引玉。
对于 onos,我认真读的也就是集群这块,也大概浏览了下其他模块的源码。onos中有些精巧的代码完全可以用于其他项目,比如,最短路径算法, floodligth的实现嵌入到了具体模块,而且不支持多路径, 而 onos提供了三种最短路径算法,而且原生支持多路径,而且模块化做的非常好。我也参考 onos 的部分设计,并且应用于公司项目中。此外,Java 8 的表达力比 Java 7 的表达力的提升在 onos 中体现的淋漓尽致,比如在有些功能相近的模块,floodlight 的实现比 onos 要冗余很多。
总之,onos 整体代码质量要远高于 floodligth。
打算写成一个系列。大体列下提纲:
集群选举
● onos 中 Raft 协议实现概论
● onos 中 gossip 协议的实现
● 集群基本原语支持,onos 支持分布式的 ConcurrentHashMap,AtomicCount,Set 等等
● 可以用于其他项目的设计,代码。
本篇主要分析 onos 集群选举的代码路径。
集群协议概述
集群选举, onos 用的 Raft 协议。至于为什么不用 poxos, 我不清楚, 但现在越来越多出现一个趋势,就是大家偏向于用 Raft 代替 Poxos。 原因就是 Raft比较简单。
这里说趋势, 是基于目前 Raft 算法实现和 poxos 协议实现的数量。另外, 我也发现 Harvard,Standford 和 CMU 已经在他们的分布式课程中将原来的 Paxos 替换为 Raft,原因可用参见这里, 而且 Raft 还有官网, 里面包含丰富的资源,而 Paxos 只有论文。所以, 总体趋势上看 Raft 已经渐渐变为主流。
基于 paxos 的实现,我们目前知道的就是 zookeeper, ceph 都实现了 paxos,而 zookeeper 实际并不是精确的 paxos 实现,而是经过修改的 ZAB 协议。最近,腾讯开源了他们的 paxos 实现 phxpaxos,因此,大量的分布式项目依赖 zookeeper 不足为奇。
而 Raft 协议,我大体了解 Raft 的官方网站 Raft 协议的实现情况, 发现基于 java 完整实现的只有copycat, jraft, jgroups-raft, RaftKVDatabase/JSimpleDB, C5 replicator。(其中 C++ 和 Go 的也都有 5 个)
● Jraft : 缺少文档,
● jsimpledb : 并不是只实现 Raft
● C5 replicator : 实现了 Raft 协议
● jgropu-raft : 实现了 Raft 协议
需要说明的是这些项目的 star 都很低,应该没有成熟到可以应用到生产中。相较于其他实现, copycat 还通过Jepsen 来测试 Raft 协议,而其他项目没有。 由此可见, 实现一个完整的可用的 Raft 目前来说还是非常有挑战的。
onos 选择 copycat 作为其 Raft 协议的实现, 从上面分析来说, copycat 的选择是没有问题的。
ONOS 集群选举
注: 本文基于 onos 1.6 分支来进行分析。
ONOS 对集群的选举暴露出了一组接口,如下所示。
public interface LeaderElector extends DistributedPrimitive {
Leadership run(String topic, NodeId nodeId);
void withdraw(String topic);
boolean anoint(String topic, NodeId nodeId);
boolean promote(String topic, NodeId nodeId);
void evict(NodeId nodeId);
Leadership getLeadership(String topic);
Map<String, Leadership> getLeaderships();
void addChangeListener(Consumer<Change<Leadership>> consumer);
void removeChangeListener(Consumer<Change<Leadership>> consumer); }
即 run, withdraw, anoint, promote, evict, 关于它们的用法, 文档解释得非常清楚,这里就直接搬运过来。
Distributed mutual exclusion primitive.
AsyncLeaderElector facilitates mutually exclusive access to a shared
resource by various cluster members.
Each resource is identified by a unique topic name and members register
their desire to access the resource by calling the AsyncLeaderElector's
run method. Access is grated on a FIFO basis.
An instance can unregister itself from the leadership election by calling
AsyncLeaderElector's withdraw method.
If an instance currently holding the resource dies then the next instance
waiting to be leader (in FIFO order) will be automatically granted access
to the resource.
One can register listeners to be notified when a leadership change occurs.
The Listeners are notified via a Leadership Change change subject.
Additionally, AsyncLeaderElector provides methods to query the current
state of leadership for topics. /
创建一个选举器的流程在 NewDistributedLeadershipStore.java 文件中有这样一段代码
LeaderElector leaderElector
StorageService storageService;
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
这里创建了一个 leaderElector, 后面代码都是对该段代码的注解。StorageManager 实现了 StorageService 接口。
StorageManager.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
StorageManager.leaderElectorBuilder() 调用了 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)
public class StorageManager implements StorageService, StorageAdminService {
protected PartitionService partitionService;
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@Activate
public void activate() {
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
.filter(id -> !id.equals(PartitionId.from(0)))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
.withName("onos-transactions")
.withSerializer(Serializer.using(KryoNamespaces.API,
Transaction.class,
Transaction.State.class))
.buildAsyncMap();
transactionCoordinator = new TransactionCoordinator(transactions);
log.info("Started");
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
}
....
}
此外,DefaultLeaderElectorBuilder 继承了 LeaderElectorBuilder:
LeaderElectorBuilder leaderElectorBuilder = new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)
leaderElectorBuilder
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElectorBuilder.withName(“onos-leadership-elections”).build() 返回 AsyncLeaderElector 类型对象 asyncLeaderElector
asyncLeaderElector.asLeaderElector()
AsyncLeaderElector 的 asLeaderElector() 调用了它的 asLeaderElector(long timeoutMillis) 方法,asyncLeaderElector.asLeaderElector(Long.MAX_VALUE) //任务超时时间为 Long.MAX_VALUE。AsyncLeaderElector 的 asLeaderElector(Long.MAX_VALUE) 方法调用了 DefaultLeaderElector 构造函数 DefaultLeaderElector(AsyncLeaderElector asyncElector, long operationTimeoutMillis)。
new DefaultLeaderElector(this, timeoutMillis),其中 this 为 AsyncLeaderElector 的实例化对象 asyncLeaderElector。
注: DefaultLeaderElector 将 LeaderElector 的所有方法通过 CompletableFuture 变为异步操作。
其中 DefaultLeaderElector 实现了 LeaderElector,而 DefaultLeaderElector 实现所有 LeaderElector 的方法依赖构造函数 AsyncLeaderElector, 因此, 问题回到了 leaderElectorBuilder.withName(“onos-leadership-elections”).build()
实际实例化的对象。即 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator).withName(“onos-leadership-elections”).build() 到底做了什么, 其中
public class DefaultLeaderElectorBuilder extends LeaderElectorBuilder {
public DefaultLeaderElectorBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
}
public AsyncLeaderElector build() {
return primitiveCreator.newAsyncLeaderElector(name());
}
}
因此, 决定于 new FederatedDistributedPrimitiveCreator(partitionMap).newAsyncLeaderElector(“onos-leadership-elections”)
public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
private final List<PartitionId> sortedMemberPartitionIds;
public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
this.members = Maps.newTreeMap();
this.members.putAll(checkNotNull(members));
this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
Maps.transformValues(members,
partition -> partition.newAsyncLeaderElector(name));
Hasher<String> hasher = topic -> {
int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
};
return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
}
}
其中 PartitionedAsyncLeaderElector 实现如下
public class PartitionedAsyncLeaderElector implements AsyncLeaderElector {
private final String name;
private final TreeMap<PartitionId, AsyncLeaderElector> partitions = Maps.newTreeMap();
private final Hasher<String> topicHasher;
public PartitionedAsyncLeaderElector(String name,
Map<PartitionId, AsyncLeaderElector> partitions,
Hasher<String> topicHasher) {
this.name = name;
this.partitions.putAll(checkNotNull(partitions));
this.topicHasher = checkNotNull(topicHasher);
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return getLeaderElector(topic).run(topic, nodeId);
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
return getLeaderElector(topic).withdraw(topic);
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
return getLeaderElector(topic).anoint(topic, nodeId);
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return getLeaderElector(topic).promote(topic, nodeId);
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(le -> le.evict(nodeId))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return getLeaderElector(topic).getLeadership(topic);
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Map<String, Leadership> leaderships = Maps.newConcurrentMap();
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(le -> le.getLeaderships().thenAccept(m -> leaderships.putAll(m)))
.toArray(CompletableFuture[]::new))
.thenApply(v -> leaderships);
}
@Override
public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(map -> map.addChangeListener(listener))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(map -> map.removeChangeListener(listener))
.toArray(CompletableFuture[]::new));
}
/**
* Returns the leaderElector (partition) to which the specified topic maps.
* @param topic topic name
* @return AsyncLeaderElector to which topic maps
*/
private AsyncLeaderElector getLeaderElector(String topic) {
return partitions.get(topicHasher.hash(topic));
}
}
因此, 一个 LeaderElector 实际调用的是实现了 AsyncLeaderElector 接口的 PartitionedAsyncLeaderElector,至此, 一个选举器实现貌似已经完成了。 当你准备研究 onos 是如何实现选举过程时,看看 withdraw, anoint, promote 的实现, 你心中一定是”万马奔腾”的。
那么, 下面我们就继续看看选举过程的具体方法是如何实现的, 实现细节藏在哪里。对于 AsyncLeaderElector 定义的所有接口, 都通过 getLeaderElector(String topic) 来实现。
那 partitions 到底包含什么? 由上面 StorageManager 分析知道, partitions 的实参是 partitionMap。而 partitionMap 又由 PartitionService partitionService 来提供, PartitionManager 实现了接口 PartitionService。