本文主要介绍nacos1.4的CP架构,nacos通过raft协议(半数以上成功)来控制集群的强一致性,在源代码中使用到countdownlatch锁来控制半数以上成功。
1.Raft协议
演示网址:http://thesecretlivesofdata.com/raft/
分区容错性:针对多节点的系统,分区指网络分区(由于网络原因节点之间无法通信同步数据),容错指系统节点出现出现分去了,对外依然要提供服务,不能因为分区而导致整个系统不能提供服务。
在集群状态下,CP保证半数以上写入成功才算成功。(一半以上投票的机制)
在nacos中,ephemeral默认为true,表示临时实例,使用的是AP架构,只将实例写到内存;非临时实例除了写到内存,还会写到磁盘文件。
Raft中有三种节点状态:
1)follower
2)candidate
3)leader
follower -> candidate:follower向其他节点发送 向自己投票的请求,如果超过半数,则认为自己被选举成功;
zookeeper与nacos的 Leader选举的区别:
nacos:raft中每个节点有休眠时间,谁先苏醒,谁先申请投票自己
zookeeper:ZAB每个节点都会进行向自己的投票的动作,然后会进行比对!
所有的修改操作都得在leader节点进行!
raft是一个两阶段写的过程,ZAB也是
1.先写leader,leader广播收到的新数据
2.半数以上返回确认,leader提交到日志,然后将commit同步给follower(日志提交)
选举的超时时间,随机150到300ms之间
先苏醒的节点开始向自己投票,然后告诉其他节点来向自己投票。其他节点都还没开始投票,那就同意这次投票(其他节点重置休眠时间)
新选举的leader向其他节点发送心跳机制(心跳包 包含数据)进行同步!(每次心跳都会重置休眠时间)
注意:如果两个节点同时苏醒,那就重新开始,因为票数相同。重新休眠!
节点的数据同步是通过下一次心跳将同步信息同步给follower节点
Raft奇数个节点可以解决集群脑裂的问题。
2.向Leader节点写数据,并同步
RaftConsistencyServiceImpl#put
@Override
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
写新实例,并进行同步
写新实例到leader,然后leader先将该实例写到日志,然后写到leader的内存;
然后leader将数据同步到其他节点,半数以上同步成功,则成功,否则抛出异常。
(小Bug,虽然从节点抛出了异常,但是主节点仍然写成功了,后面的nacos版本通过jraft协议进行了完善)
涉及到的代码如下:
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!isLeader()) {// 判断当前节点是不是leader
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);// 如果不是leader,则转发到Leader节点
return;
}
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
onPublish(datum, peers.local());// leader写完磁盘文件,同步内存
final String content = json.toString();
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());// leader同步数据
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildUrl(server, API_ON_PUB);// nacos原生 只有一步提交
// 向其他节点同步数据
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
// 虽然抛出了异常,但是主节点写成功了
// 半数以上写成功了
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
3.集群数据一致性的源码
RaftCore#init():核心的代码入口
加载数据,写入缓存
包含两个定时任务:选举任务;心跳任务。核心代码如下:
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);// 加载数据,写到缓存
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());// leader选举任务 500ms
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());// 心跳任务 500ms
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
NotifyCenter.registerSubscriber(notifier);
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
两个定时任务:
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());// leader选举任务 500ms
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());// 心跳任务 500ms
选举的核心代码:
苏醒的节点向其他节点发起投票请求,然后判断是否到达半数。在这个过程中会重置心跳时间,选举时间
用回调函数来判断是否多于半数(用到countdownlatch来控制),MasterElection#run的核心代码:
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;// 随机休眠
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();// 发起投票
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
发起选举的核心代码:
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;// 组装投票的信息
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
for (final String server : peers.allServersWithoutMySelf()) {// 发送投票
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {// 调用http请求
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);// 投票信息
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
peers.decideLeader(peer);// 判断其他节点发回来的消息,判断是否多于半数
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
心跳的核心代码:
只有主节点可以发送心跳,入口方法run->sendBeat
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
sendBeat();// 发起心跳
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
onBeat:将所有注册信息的key和时间戳放到element中,然后进行压缩,然后发给其他节点
RaftController#beat:接收心跳信息
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
return JacksonUtils.transferToJsonNode(peer);
}
批量处理同步的数据,一批一批的回调leader进行同步;本地删除状态为0的数据(主节点的该数据修改了)。
if (batch.size() < 50 && processedCount < beatDatums.size()) {// 批量处理同步数据
continue;
}
RaftCore#receivedBeat
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}