微服务-Nacos数据一致性

 

Nacos数据一致性

目录

一、Raft算法

二、Nacos中Raft部分源码

init()

1. 获取Raft集群节点 

NamingProxy.getServers()获取集群节点

NamingProxy.refreshSrvIfNeed()得到节点信息

NamingProxy.refreshServerListFromDisk()获取集群节点信息

2. Raft集群数据恢复

RaftStore.load()

3. Raft选举

GlobalExecutor.register(new MasterElection())注册选举定时任务

MasterElection.sendVote()发送定时任务

(1)RaftCommands.vote()处理/v1/ns/raft/vote请求

(2)PeerSet.decideLeader()选举

4. Raft心跳

GlobalExecutor.register(new HeartBeat())注册心跳定时任务

HeartBeat.sendBeat()发送心跳包

(·)RaftCommands.beat()方法处理/v1/ns/raft/beat请求

5. Raft发布内容

注册入口

实例信息持久化

(1)Service.put()

(2)RaftCore.signalPublish()

(3)/raft/datum 接口 和 /raft/datum/commit 接口

发布入口 RaftCommands.publish()

6. Raft保证内容一致性


一、Raft算法

Raft通过当选的领导者达成共识。筏集群中的服务器是领导者或追随者,并且在选举的精确情况下可以是候选者(领导者不可用)。领导者负责将日志复制到关注者。它通过发送心跳消息定期通知追随者它的存在。每个跟随者都有一个超时(通常在150到300毫秒之间),它期望领导者的心跳。接收心跳时重置超时。如果没有收到心跳,则关注者将其状态更改为候选人并开始领导选举。

详见:Raft算法

二、Nacos中Raft部分源码

Nacos server在启动时,会通过RunningConfig.onApplicationEvent()方法调用RaftCore.init()方法。

init()

 
  1. public static void init() throws Exception {

  2.  
  3. Loggers.RAFT.info("initializing Raft sub-system");

  4.  
  5. // 启动Notifier,轮询Datums,通知RaftListener

  6. executor.submit(notifier);

  7.  
  8. // 获取Raft集群节点,更新到PeerSet中

  9. peers.add(NamingProxy.getServers());

  10.  
  11. long start = System.currentTimeMillis();

  12.  
  13. // 从磁盘加载Datum和term数据进行数据恢复

  14. RaftStore.load();

  15.  
  16. Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",

  17. peers.size(), datums.size(), peers.getTerm());

  18.  
  19. while (true) {

  20. if (notifier.tasks.size() <= 0) {

  21. break;

  22. }

  23. Thread.sleep(1000L);

  24. System.out.println(notifier.tasks.size());

  25. }

  26.  
  27. Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));

  28.  
  29. GlobalExecutor.register(new MasterElection()); // Leader选举

  30. GlobalExecutor.register1(new HeartBeat()); // Raft心跳

  31. GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);

  32.  
  33. if (peers.size() > 0) {

  34. if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {

  35. initialized = true;

  36. lock.unlock();

  37. }

  38. } else {

  39. throw new Exception("peers is empty.");

  40. }

  41.  
  42. Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",

  43. GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);

  44. }

在init方法主要做了如下几件事:

  • 1. 获取Raft集群节点 peers.add(NamingProxy.getServers());
  • 2. Raft集群数据恢复 RaftStore.load();
  • 3. Raft选举 GlobalExecutor.register(new MasterElection()); 
  • 4. Raft心跳 GlobalExecutor.register(new HeartBeat()); 
  • 5. Raft发布内容
  • 6. Raft保证内容一致性

1. 获取Raft集群节点 

NamingProxy.getServers()获取集群节点

  • NamingProxy.refreshSrvIfNeed()得到节点信息
  • 返回List<String> servers

NamingProxy.refreshSrvIfNeed()得到节点信息

  • 如果单机模式

    则本主机的ip:port为Raft节点信息;

    否则

    调用下面的NamingProxy.refreshServerListFromDisk()获取Raft集群节点信息

  • 获取到Raft集群节点信息之后(即ip:port列表),更新NamingProxy的List<String> serverlistFromConfig属性和List<String> servers属性。

NamingProxy.refreshServerListFromDisk()获取集群节点信息

从磁盘或系统环境变量种读取Raft集群节点信息,即ip:port列表

2. Raft集群数据恢复

Nacos启动/重启时会从磁盘加载Datum和term数据进行数据恢复。

nacos server端启动后->RaftCore.init()方法->RaftStore.load()方法。

RaftStore.load()

  • 从磁盘获取Datum数据:

    将Datum放到RaftCore的ConcurrentMap<String, Datum> datums集合中,key为Datum的key;

    将Datum和ApplyAction.CHANGE封装成Pair放到Notifier的tasks队列中,通知相关的RaftListener;

  • 从META_FILE_NAME:<user.home>\nacos\raft\meta.properties获取任期term值(long值):

    调用RaftSet.setTerm(long term)方法更新Raft集群中每个节点的term值

3. Raft选举

GlobalExecutor.register(new MasterElection())注册选举定时任务

Nacos的Raft选举是通过MasterElection这个线程任务完成的。

  • 更新候选节点的election timeout、heart timeout。
  • 调用MasterElection.sendVote()进行投票。
 
  1. public class MasterElection implements Runnable {

  2. @Override

  3. public void run() {

  4. try {

  5. if (!peers.isReady()) {

  6. return;

  7. }

  8.  
  9. RaftPeer local = peers.local();

  10. local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

  11. if (local.leaderDueMs > 0) {

  12. return;

  13. }

  14.  
  15. // 重置选举超时时间,每次心跳以及收到数据包都会重置

  16. local.resetLeaderDue();

  17. local.resetHeartbeatDue();

  18.  
  19. // 发起选举

  20. sendVote();

  21. } catch (Exception e) {

  22. Loggers.RAFT.warn("[RAFT] error while master election {}", e);

  23. }

  24. }

  25. }

MasterElection.sendVote()发送定时任务

  • 重置Raft集群数据:

leader置为null; 所有Raft节点的voteFor字段置为null;

  • 更新候选节点数据:

任期term自增1;(通过自增1制造和其它节点的term差异,避免所有节点term一样选举不出Leader)

候选节点的voteFor字段设置为自己;

state置为CANDIDATE;

  • 候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求:

请求内容为vote:JSON.toJSONString(local)

  • 候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader()方法处理

把超半数的voteFor对应的RaftPerr设置为Leader。

 
  1. public void sendVote() {

  2.  
  3. RaftPeer local = peers.get(NetUtils.localServer());

  4. Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",

  5. JSON.toJSONString(getLeader()), local.term);

  6.  
  7. //重置Raft集群数据

  8. peers.reset();

  9.  
  10. //更新候选节点数据

  11. local.term.incrementAndGet();

  12. local.voteFor = local.ip;

  13. local.state = RaftPeer.State.CANDIDATE;

  14.  
  15.  
  16. //候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求

  17. //请求内容为vote:JSON.toJSONString(local)

  18. Map<String, String> params = new HashMap<String, String>(1);

  19. params.put("vote", JSON.toJSONString(local));

  20. for (final String server : peers.allServersWithoutMySelf()) {

  21. final String url = buildURL(server, API_VOTE);

  22. try {

  23. HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {

  24. @Override

  25. public Integer onCompleted(Response response) throws Exception {

  26. if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {

  27. Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);

  28. return 1;

  29. }

  30.  
  31. RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);

  32.  
  33. Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));

  34.  
  35. //候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader

  36. //方法处理

  37. peers.decideLeader(peer);

  38.  
  39. return 0;

  40. }

  41. });

  42. } catch (Exception e) {

  43. Loggers.RAFT.warn("error while sending vote to server: {}", server);

  44. }

  45. }

  46. }

  47. }

(1)RaftCommands.vote()处理/v1/ns/raft/vote请求

选举请求的 http 接口

 
  1. @RestController

  2. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")

  3. public class RaftController {

  4.  
  5. ......

  6.  
  7. @NeedAuth

  8. @RequestMapping(value = "/vote", method = RequestMethod.POST)

  9. public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {

  10. // 处理选举请求

  11. RaftPeer peer = raftCore.receivedVote(

  12. JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));

  13.  
  14. return JSON.parseObject(JSON.toJSONString(peer));

  15. }

  16.  
  17.  
  18. ......

  19. }

调用RaftCore.MasterElection.receivedVote()方法

如果收到的候选节点term比本地节点term要小,则:

                   本地节点的voteFor更新为自己;(意思是我自己更适合做leader,这一票我投给自己)

否则:

                   这个Follower重置它的election timeout;

                   更新它的voteFor为收到的候选节点ip;(意思是就按你说的做,这一票就投给你了。)

                   更新它的term为收到的候选节点term;

将本地节点作为http响应返回;

 
  1. @Component

  2. public class RaftCore {

  3.  
  4. ......

  5.  
  6. public RaftPeer receivedVote(RaftPeer remote) {

  7. if (!peers.contains(remote)) {

  8. throw new IllegalStateException("can not find peer: " + remote.ip);

  9. }

  10.  
  11. // 若当前节点的 term 大于等于发送选举请求的节点 term,则选择自己为 leader

  12. RaftPeer local = peers.get(NetUtils.localServer());

  13. if (remote.term.get() <= local.term.get()) {

  14. String msg = "received illegitimate vote" +

  15. ", voter-term:" + remote.term + ", votee-term:" + local.term;

  16.  
  17. Loggers.RAFT.info(msg);

  18. if (StringUtils.isEmpty(local.voteFor)) {

  19. local.voteFor = local.ip;

  20. }

  21.  
  22. return local;

  23. }

  24.  
  25. local.resetLeaderDue();

  26.  
  27. // 若当前节点的 term 小于发送请求的节点 term,选择发送请求的节点为 leader

  28. local.state = RaftPeer.State.FOLLOWER;

  29. local.voteFor = remote.ip;

  30. local.term.set(remote.term.get());

  31.  
  32. Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);

  33.  
  34. return local;

  35. }

  36. }

(2)PeerSet.decideLeader()选举

 
  1. @Component

  2. @DependsOn("serverListManager")

  3. public class RaftPeerSet implements ServerChangeListener {

  4.  
  5. ......

  6.  
  7. public RaftPeer decideLeader(RaftPeer candidate) {

  8. peers.put(candidate.ip, candidate);

  9.  
  10. SortedBag ips = new TreeBag();

  11. int maxApproveCount = 0;

  12. String maxApprovePeer = null;

  13. // 遍历所有的节点,若 voteFor 不为空,则将节点的 voteFor 添加到 ips 中,记录被选举次数最多的节点和次数

  14. for (RaftPeer peer : peers.values()) {

  15. if (StringUtils.isEmpty(peer.voteFor)) {

  16. continue;

  17. }

  18.  
  19. ips.add(peer.voteFor);

  20. if (ips.getCount(peer.voteFor) > maxApproveCount) {

  21. maxApproveCount = ips.getCount(peer.voteFor);

  22. maxApprovePeer = peer.voteFor;

  23. }

  24. }

  25.  
  26. // 将选举出来的节点设置为 leader

  27. if (maxApproveCount >= majorityCount()) {

  28. RaftPeer peer = peers.get(maxApprovePeer);

  29. peer.state = RaftPeer.State.LEADER;

  30.  
  31. if (!Objects.equals(leader, peer)) {

  32. leader = peer;

  33. Loggers.RAFT.info("{} has become the LEADER", leader.ip);

  34. }

  35. }

  36.  
  37. return leader;

  38. }

  39. }

4. Raft心跳

GlobalExecutor.register(new HeartBeat())注册心跳定时任务

  •  重置Leader节点的heart timeout、election timeout;
  • sendBeat()发送心跳包
 
  1. public class HeartBeat implements Runnable {

  2. @Override

  3. public void run() {

  4. try {

  5. if (!peers.isReady()) {

  6. return;

  7. }

  8.  
  9. RaftPeer local = peers.local();

  10. // hearbeatDueMs 默认为 5s,TICK_PERIOD_MS 为 500ms,每 500ms 检查一次,每 5s 发送一次心跳

  11. local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;

  12. if (local.heartbeatDueMs > 0) {

  13. return;

  14. }

  15.  
  16. // 重置 heartbeatDueMs

  17. local.resetHeartbeatDue();

  18.  
  19. // 发送心跳包

  20. sendBeat();

  21. } catch (Exception e) {

  22. Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);

  23. }

  24. }

  25. }

HeartBeat.sendBeat()发送心跳包

  • 重置Leader节点的heart timeout、election timeout;
  • 除自身之外的其它节点/v1/ns/raft/beat路径发送HTTP POST请求,请求内容如下:

JSONObject packet = new JSONObject();

packet.put("peer", local);  //local为Leader节点对应的RaftPeer对象

packet.put("datums", array); //array中封装了RaftCore中所有的Datum的key和timestamp

Map<String, String> params = new HashMap<String, String>(1);

params.put("beat", JSON.toJSONString(packet));

  • 拿到各个节点返回的http响应,即RaftPeer对象,更新PeerSet的Map<String, RaftPeer> peers集合。(保持集群节点数据一致)
 
  1. public void sendBeat() throws IOException, InterruptedException {

  2. RaftPeer local = peers.local();

  3. // 只有 leader 才发送心跳

  4. if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {

  5. return;

  6. }

  7.  
  8. Loggers.RAFT.info("[RAFT] send beat with {} keys.", datums.size());

  9.  
  10. // 重置收不到包就选举 leader 的时间间隔

  11. local.resetLeaderDue();

  12.  
  13. // 构建心跳包信息,local 为当前 nacos 节点的信息,key 为 peer

  14. JSONObject packet = new JSONObject();

  15. packet.put("peer", local);

  16.  
  17. JSONArray array = new JSONArray();

  18.  
  19. // 只发送心跳包,不带数据过去

  20. if (switchDomain.isSendBeatOnly()) {

  21. Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));

  22. }

  23.  
  24. // 将相关的 key 通过心跳包发送给 follower

  25. if (!switchDomain.isSendBeatOnly()) {

  26. for (Datum datum : datums.values()) {

  27. JSONObject element = new JSONObject();

  28.  
  29. // 将 key 和对应的版本放入 element 中,最终添加到 array 里

  30. if (KeyBuilder.matchServiceMetaKey(datum.key)) {

  31. element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));

  32. } else if (KeyBuilder.matchInstanceListKey(datum.key)) {

  33. element.put("key", KeyBuilder.briefInstanceListkey(datum.key));

  34. }

  35. element.put("timestamp", datum.timestamp);

  36.  
  37. array.add(element);

  38. }

  39. } else {

  40. Loggers.RAFT.info("[RAFT] send beat only.");

  41. }

  42.  
  43. // 将所有 key 组成的 array 放入数据包

  44. packet.put("datums", array);

  45.  
  46. // 将数据包转换成 json 字符串放入 params 中

  47. Map<String, String> params = new HashMap<String, String>(1);

  48. params.put("beat", JSON.toJSONString(packet));

  49.  
  50. String content = JSON.toJSONString(params);

  51.  
  52. // 用 gzip 压缩

  53. ByteArrayOutputStream out = new ByteArrayOutputStream();

  54. GZIPOutputStream gzip = new GZIPOutputStream(out);

  55. gzip.write(content.getBytes("UTF-8"));

  56. gzip.close();

  57.  
  58. byte[] compressedBytes = out.toByteArray();

  59. String compressedContent = new String(compressedBytes, "UTF-8");

  60. Loggers.RAFT.info("raw beat data size: {}, size of compressed data: {}",

  61. content.length(), compressedContent.length());

  62.  
  63. // 将心跳包发送给所有的 follower

  64. for (final String server : peers.allServersWithoutMySelf()) {

  65. try {

  66. final String url = buildURL(server, API_BEAT);

  67. Loggers.RAFT.info("send beat to server " + server);

  68. HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {

  69. @Override

  70. public Integer onCompleted(Response response) throws Exception {

  71. if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {

  72. Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",

  73. response.getResponseBody(), server);

  74. MetricsMonitor.getLeaderSendBeatFailedException().increment();

  75. return 1;

  76. }

  77. peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));

  78. Loggers.RAFT.info("receive beat response from: {}", url);

  79. return 0;

  80. }

  81.  
  82. @Override

  83. public void onThrowable(Throwable t) {

  84. Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);

  85. MetricsMonitor.getLeaderSendBeatFailedException().increment();

  86. }

  87. });

  88. } catch (Exception e) {

  89. Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);

  90. MetricsMonitor.getLeaderSendBeatFailedException().increment();

  91. }

  92. }

  93. }

(·)RaftCommands.beat()方法处理/v1/ns/raft/beat请求

接收心跳包的 http 接口:

 
  1. @RestController

  2. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")

  3. public class RaftController {

  4.  
  5. ......

  6.  
  7. @NeedAuth

  8. @RequestMapping(value = "/beat", method = RequestMethod.POST)

  9. public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {

  10. String entity = new String(IoUtils.tryDecompress(request.getInputStream()), "UTF-8");

  11. String value = URLDecoder.decode(entity, "UTF-8");

  12. value = URLDecoder.decode(value, "UTF-8");

  13.  
  14. // 解析心跳包

  15. JSONObject json = JSON.parseObject(value);

  16. JSONObject beat = JSON.parseObject(json.getString("beat"));

  17.  
  18. // 处理心跳包并将本节点的信息作为 response 返回

  19. RaftPeer peer = raftCore.receivedBeat(beat);

  20. return JSON.parseObject(JSON.toJSONString(peer));

  21. }

  22.  
  23. ......

  24. }

HeartBeat.receivedBeat()处理心跳包

  • 如果收到心跳的节点不是Follower角色,则设置为Follower角色,并把它的voteFor设置为Leader节点的ip;
  • 重置本地节点的heart timeoutelection timeout
  • 调用PeerSet.makeLeader()通知这个节点更新Leader;(也就是说Leader节点会通过心跳通知其它节点更新Leader)
  • 检查Datum:

遍历请求参数中的datums,如果Follwoer不存在这个datumKey或者时间戳比较旧,则收集这个datumKey;

每收集到50个datumKey,则向Leader节点的/v1/ns/raft/get路径发送请求,请求参数为这50个datumKey,获取对应的50个最新的Datum对象;

遍历这些Daum对象,接下来做的是和RaftCore.onPublish()方法中做的事类似:
              1.调用RaftStore#write将Datum序列化为json写到cacheFile中
              2.将Datum存放到RaftCore的datums集合中,key为上面的datum的key值
              3.更新本地节点的election timeout
              4.更新本地节点的任期term
              5.本地节点的任期term持久化到properties文件中
              6.调用notifier.addTask(datum, Notifier.ApplyAction.CHANGE);

通知对应的RaftListener

RaftCore.deleteDatum(String key)用来删除旧的Datum
              datums集合中删除key对应的Datum;
              RaftStore.delete(),在磁盘上删除这个Datum对应的文件;
              notifier.addTask(deleted, Notifier.ApplyAction.DELETE),通知对应的RaftListener有DELETE事件。

  • 本地节点的RaftPeer作为http响应返回。
 
  1. @Component

  2. public class RaftCore {

  3.  
  4. ......

  5.  
  6. public RaftPeer receivedBeat(JSONObject beat) throws Exception {

  7. final RaftPeer local = peers.local();

  8. // 解析发送心跳包的节点信息

  9. final RaftPeer remote = new RaftPeer();

  10. remote.ip = beat.getJSONObject("peer").getString("ip");

  11. remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));

  12. remote.term.set(beat.getJSONObject("peer").getLongValue("term"));

  13. remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");

  14. remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");

  15. remote.voteFor = beat.getJSONObject("peer").getString("voteFor");

  16.  
  17. // 若收到的心跳包不是 leader 节点发送的,则抛异常

  18. if (remote.state != RaftPeer.State.LEADER) {

  19. Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",

  20. remote.state, JSON.toJSONString(remote));

  21. throw new IllegalArgumentException("invalid state from master, state: " + remote.state);

  22. }

  23.  
  24. // 本地 term 大于心跳包的 term,则心跳包不进行处理

  25. if (local.term.get() > remote.term.get()) {

  26. Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"

  27. , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);

  28. throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()

  29. + ", beat-to-term: " + local.term.get());

  30. }

  31.  
  32. // 若当前节点不是 follower 节点,则将其更新为 follower 节点

  33. if (local.state != RaftPeer.State.FOLLOWER) {

  34. Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));

  35. // mk follower

  36. local.state = RaftPeer.State.FOLLOWER;

  37. local.voteFor = remote.ip;

  38. }

  39.  
  40. final JSONArray beatDatums = beat.getJSONArray("datums");

  41. // 更新心跳包发送间隔和收不到心跳包的选举间隔

  42. local.resetLeaderDue();

  43. local.resetHeartbeatDue();

  44.  
  45. // 更新 leader 信息,将 remote 设置为新 leader,更新原有 leader 的节点信息

  46. peers.makeLeader(remote);

  47.  
  48. // 将当前节点的 key 存放到一个 map 中,value 都为 0

  49. Map<String, Integer> receivedKeysMap = new HashMap<String, Integer>(datums.size());

  50. for (Map.Entry<String, Datum> entry : datums.entrySet()) {

  51. receivedKeysMap.put(entry.getKey(), 0);

  52. }

  53.  
  54. // 检查接收到的 datum 列表

  55. List<String> batch = new ArrayList<String>();

  56. if (!switchDomain.isSendBeatOnly()) {

  57. int processedCount = 0;

  58. Loggers.RAFT.info("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",

  59. beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);

  60. for (Object object : beatDatums) {

  61. processedCount = processedCount + 1;

  62.  
  63. JSONObject entry = (JSONObject) object;

  64. String key = entry.getString("key");

  65. final String datumKey;

  66. // 构建 datumKey(加上前缀,发送的时候 key 是去掉了前缀的)

  67. if (KeyBuilder.matchServiceMetaKey(key)) {

  68. datumKey = KeyBuilder.detailServiceMetaKey(key);

  69. } else if (KeyBuilder.matchInstanceListKey(key)) {

  70. datumKey = KeyBuilder.detailInstanceListkey(key);

  71. } else {

  72. // ignore corrupted key:

  73. continue;

  74. }

  75.  
  76. // 获取收到的 key 对应的版本

  77. long timestamp = entry.getLong("timestamp");

  78.  
  79. // 将收到的 key 在本地 key 的 map 中标记为 1

  80. receivedKeysMap.put(datumKey, 1);

  81.  
  82. try {

  83. // 收到的 key 在本地存在 并且 本地的版本大于收到的版本 并且 还有数据未处理,则直接 continue

  84. if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {

  85. continue;

  86. }

  87.  
  88. // 若收到的 key 在本地没有,或者本地的版本小于收到的版本,放入 batch,准备下一步获取数据

  89. if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {

  90. batch.add(datumKey);

  91. }

  92.  
  93. // 只有 batch 的数量超过 50 或已经处理完了,才进行获取数据操作

  94. if (batch.size() < 50 && processedCount < beatDatums.size()) {

  95. continue;

  96. }

  97.  
  98. String keys = StringUtils.join(batch, ",");

  99.  
  100. if (batch.size() <= 0) {

  101. continue;

  102. }

  103.  
  104. Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"

  105. , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());

  106.  
  107. // 获取对应 key 的数据

  108. // update datum entry

  109. String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");

  110. HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {

  111. @Override

  112. public Integer onCompleted(Response response) throws Exception {

  113. if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {

  114. return 1;

  115. }

  116.  
  117. List<Datum> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<Datum>>() {

  118. });

  119.  
  120. // 更新本地数据

  121. for (Datum datum : datumList) {

  122. OPERATE_LOCK.lock();

  123. try {

  124. Datum oldDatum = getDatum(datum.key);

  125.  
  126. if (oldDatum != null && datum.timestamp.get() <= oldDatum.timestamp.get()) {

  127. Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",

  128. datum.key, datum.timestamp, oldDatum.timestamp);

  129. continue;

  130. }

  131.  
  132. raftStore.write(datum);

  133.  
  134. if (KeyBuilder.matchServiceMetaKey(datum.key)) {

  135. Datum<Service> serviceDatum = new Datum<>();

  136. serviceDatum.key = datum.key;

  137. serviceDatum.timestamp.set(datum.timestamp.get());

  138. serviceDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Service.class);

  139. datum = serviceDatum;

  140. }

  141.  
  142. if (KeyBuilder.matchInstanceListKey(datum.key)) {

  143. Datum<Instances> instancesDatum = new Datum<>();

  144. instancesDatum.key = datum.key;

  145. instancesDatum.timestamp.set(datum.timestamp.get());

  146. instancesDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Instances.class);

  147. datum = instancesDatum;

  148. }

  149.  
  150. datums.put(datum.key, datum);

  151. notifier.addTask(datum.key, ApplyAction.CHANGE);

  152.  
  153. local.resetLeaderDue();

  154.  
  155. if (local.term.get() + 100 > remote.term.get()) {

  156. getLeader().term.set(remote.term.get());

  157. local.term.set(getLeader().term.get());

  158. } else {

  159. local.term.addAndGet(100);

  160. }

  161.  
  162. raftStore.updateTerm(local.term.get());

  163.  
  164. Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",

  165. datum.key, datum.timestamp, JSON.toJSONString(remote), local.term);

  166.  
  167. } catch (Throwable e) {

  168. Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, key: {} {}", datum.key, e);

  169. } finally {

  170. OPERATE_LOCK.unlock();

  171. }

  172. }

  173. TimeUnit.MILLISECONDS.sleep(200);

  174. return 0;

  175. }

  176. });

  177.  
  178. batch.clear();

  179. } catch (Exception e) {

  180. Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);

  181. }

  182. }

  183.  
  184. // 若某个 key 在本地存在但收到的 key 列表中没有,则证明 leader 已经删除,那么本地也要删除

  185. List<String> deadKeys = new ArrayList<String>();

  186. for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {

  187. if (entry.getValue() == 0) {

  188. deadKeys.add(entry.getKey());

  189. }

  190. }

  191.  
  192. for (String deadKey : deadKeys) {

  193. try {

  194. deleteDatum(deadKey);

  195. } catch (Exception e) {

  196. Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);

  197. }

  198. }

  199. }

  200.  
  201. return local;

  202. }

  203. }

5. Raft发布内容

注册入口

注册http接口

 
  1. @RestController

  2. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")

  3. public class InstanceController {

  4.  
  5. ......

  6.  
  7. @CanDistro

  8. @RequestMapping(value = "", method = RequestMethod.POST)

  9. public String register(HttpServletRequest request) throws Exception {

  10. // 获取 namespace 和 serviceName

  11. String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

  12. String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

  13.  
  14. // 执行注册逻辑

  15. serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));

  16. return "ok";

  17. }

  18. }

注册实例

 
  1. @Component

  2. @DependsOn("nacosApplicationContext")

  3. public class ServiceManager implements RecordListener<Service> {

  4.  
  5. ......

  6.  
  7. private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

  8.  
  9.  
  10. ......

  11.  
  12. // 注册新实例

  13. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

  14. // 创建空 service,所有的 service 都存放在 serviceMap 中,serviceMap 类型为:Map<String, Map<String, Service>>,第一层 map 的 key 为 namespace,第二层 map 的 key 为 serviceName;

  15. // 每个 service 中维护一个 clusterMap,clusterMap 中有两个 set,用来存放 instance

  16. if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {

  17. createEmptyService(namespaceId, serviceName);

  18. }

  19.  
  20. Service service = getService(namespaceId, serviceName);

  21.  
  22. if (service == null) {

  23. throw new NacosException(NacosException.INVALID_PARAM,

  24. "service not found, namespace: " + namespaceId + ", service: " + serviceName);

  25. }

  26.  
  27. // 检查实例是否已存在,通过 ip 进行比较

  28. if (service.allIPs().contains(instance)) {

  29. throw new NacosException(NacosException.INVALID_PARAM, "instance already exist: " + instance);

  30. }

  31.  
  32. // 添加新实例

  33. addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

  34. }

  35.  
  36.  
  37. // 创建空 service

  38. public void createEmptyService(String namespaceId, String serviceName) throws NacosException {

  39. Service service = getService(namespaceId, serviceName);

  40. if (service == null) {

  41. service = new Service();

  42. service.setName(serviceName);

  43. service.setNamespaceId(namespaceId);

  44. service.setGroupName(Constants.DEFAULT_GROUP);

  45. // now validate the service. if failed, exception will be thrown

  46. service.setLastModifiedMillis(System.currentTimeMillis());

  47. service.recalculateChecksum();

  48. service.validate();

  49. putService(service);

  50. service.init();

  51. // 添加对 service 的监听,用来同步数据

  52. consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);

  53. consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

  54. }

  55. }

  56.  
  57. // 添加 instance 到缓存中,并且持久化

  58. public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {

  59. String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

  60.  
  61. Service service = getService(namespaceId, serviceName);

  62.  
  63. // 添加 instance 到本地缓存

  64. List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

  65.  
  66. Instances instances = new Instances();

  67. instances.setInstanceList(instanceList);

  68.  
  69. // 将 instance 信息持久化

  70. consistencyService.put(key, instances);

  71. }

  72.  
  73. // 添加实例到缓存

  74. public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {

  75. return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);

  76. }

  77.  
  78. // 真正的添加实例到缓存的逻辑

  79. public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {

  80. Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

  81.  
  82. Map<String, Instance> oldInstanceMap = new HashMap<>(16);

  83. List<Instance> currentIPs = service.allIPs(ephemeral);

  84. Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

  85.  
  86. for (Instance instance : currentIPs) {

  87. map.put(instance.toIPAddr(), instance);

  88. }

  89. if (datum != null) {

  90. oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);

  91. }

  92.  
  93. // use HashMap for deep copy:

  94. HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());

  95. instanceMap.putAll(oldInstanceMap);

  96.  
  97. for (Instance instance : ips) {

  98. if (!service.getClusterMap().containsKey(instance.getClusterName())) {

  99. Cluster cluster = new Cluster(instance.getClusterName());

  100. cluster.setService(service);

  101. service.getClusterMap().put(instance.getClusterName(), cluster);

  102. Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",

  103. instance.getClusterName(), instance.toJSON());

  104. }

  105.  
  106. if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {

  107. instanceMap.remove(instance.getDatumKey());

  108. } else {

  109. instanceMap.put(instance.getDatumKey(), instance);

  110. }

  111. }

  112.  
  113. if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {

  114. throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "

  115. + JSON.toJSONString(instanceMap.values()));

  116. }

  117.  
  118. return new ArrayList<>(instanceMap.values());

  119. }

  120.  
  121. // 将旧的 instance 列表与新的 instance 合并到一起

  122. private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) {

  123. Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size());

  124. for (Instance instance : oldInstances) {

  125. Instance instance1 = map.get(instance.toIPAddr());

  126. if (instance1 != null) {

  127. instance.setHealthy(instance1.isHealthy());

  128. instance.setLastBeat(instance1.getLastBeat());

  129. }

  130. instanceMap.put(instance.getDatumKey(), instance);

  131. }

  132. return instanceMap;

  133. }

  134.  
  135. ......

  136. }

实例信息持久化

RaftConsistencyServiceImpl.put() 方法用来做实例信息持久化的工作,即上面提到的consistencyService.put(key, instances);这一步

(1)Service.put()

 
  1. @Service

  2. public class RaftConsistencyServiceImpl implements PersistentConsistencyService {

  3.  
  4. ......

  5.  
  6. @Override

  7. public void put(String key, Record value) throws NacosException {

  8. try {

  9. raftCore.signalPublish(key, value);

  10. } catch (Exception e) {

  11. Loggers.RAFT.error("Raft put failed.", e);

  12. throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value);

  13. }

  14. }

  15. }

最终调用到 RaftCore 的 signalPublish() 方法:

(2)RaftCore.signalPublish()

 
  1. @Component

  2. public class RaftCore {

  3.  
  4. ......

  5.  
  6. public void signalPublish(String key, Record value) throws Exception {

  7. // 若不是 leader,直接将包转发给 leader

  8. if (!isLeader()) {

  9. JSONObject params = new JSONObject();

  10. params.put("key", key);

  11. params.put("value", value);

  12. Map<String, String> parameters = new HashMap<>(1);

  13. parameters.put("key", key);

  14.  
  15. // 调用 /raft/datum 接口

  16. raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);

  17. return;

  18. }

  19.  
  20. // 若是 leader,将包发送给所有的 follower

  21. try {

  22. OPERATE_LOCK.lock();

  23. long start = System.currentTimeMillis();

  24. final Datum datum = new Datum();

  25. datum.key = key;

  26. datum.value = value;

  27. if (getDatum(key) == null) {

  28. datum.timestamp.set(1L);

  29. } else {

  30. datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());

  31. }

  32.  
  33. JSONObject json = new JSONObject();

  34. json.put("datum", datum);

  35. json.put("source", peers.local());

  36.  
  37. // 本地 onPublish 方法用来处理持久化逻辑

  38. onPublish(datum, peers.local());

  39.  
  40. final String content = JSON.toJSONString(json);

  41.  
  42. final CountDownLatch latch = new CountDownLatch(peers.majorityCount());

  43. // 将包发送给所有的 follower,调用 /raft/datum/commit 接口

  44. for (final String server : peers.allServersIncludeMyself()) {

  45. if (isLeader(server)) {

  46. latch.countDown();

  47. continue;

  48. }

  49. final String url = buildURL(server, API_ON_PUB);

  50. HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {

  51. @Override

  52. public Integer onCompleted(Response response) throws Exception {

  53. if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {

  54. Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",

  55. datum.key, server, response.getStatusCode());

  56. return 1;

  57. }

  58. latch.countDown();

  59. return 0;

  60. }

  61.  
  62. @Override

  63. public STATE onContentWriteCompleted() {

  64. return STATE.CONTINUE;

  65. }

  66. });

  67. }

  68.  
  69. if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {

  70. // only majority servers return success can we consider this update success

  71. Loggers.RAFT.info("data publish failed, caused failed to notify majority, key={}", key);

  72. throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);

  73. }

  74.  
  75. long end = System.currentTimeMillis();

  76. Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);

  77. } finally {

  78. OPERATE_LOCK.unlock();

  79. }

  80. }

  81. }

(3)/raft/datum 接口 和 /raft/datum/commit 接口

 
  1. @RestController

  2. @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft")

  3. public class RaftController {

  4.  
  5. ......

  6.  
  7. @NeedAuth

  8. @RequestMapping(value = "/datum", method = RequestMethod.POST)

  9. public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {

  10.  
  11. response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));

  12. response.setHeader("Cache-Control", "no-cache");

  13. response.setHeader("Content-Encode", "gzip");

  14.  
  15. String entity = IOUtils.toString(request.getInputStream(), "UTF-8");

  16. String value = URLDecoder.decode(entity, "UTF-8");

  17. JSONObject json = JSON.parseObject(value);

  18.  
  19. // 这里也是调用 RaftConsistencyServiceImpl.put() 进行处理,与服务注册的逻辑在此回合,最终调用到 signalPublish 方法

  20. String key = json.getString("key");

  21. if (KeyBuilder.matchInstanceListKey(key)) {

  22. raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Instances.class));

  23. return "ok";

  24. }

  25.  
  26. if (KeyBuilder.matchSwitchKey(key)) {

  27. raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), SwitchDomain.class));

  28. return "ok";

  29. }

  30.  
  31. if (KeyBuilder.matchServiceMetaKey(key)) {

  32. raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Service.class));

  33. return "ok";

  34. }

  35.  
  36. throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);

  37. }

  38.  
  39.  
  40. @NeedAuth

  41. @RequestMapping(value = "/datum/commit", method = RequestMethod.POST)

  42. public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {

  43. response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));

  44. response.setHeader("Cache-Control", "no-cache");

  45. response.setHeader("Content-Encode", "gzip");

  46.  
  47. String entity = IOUtils.toString(request.getInputStream(), "UTF-8");

  48. String value = URLDecoder.decode(entity, "UTF-8");

  49. JSONObject jsonObject = JSON.parseObject(value);

  50. String key = "key";

  51.  
  52. RaftPeer source = JSON.parseObject(jsonObject.getString("source"), RaftPeer.class);

  53. JSONObject datumJson = jsonObject.getJSONObject("datum");

  54.  
  55. Datum datum = null;

  56. if (KeyBuilder.matchInstanceListKey(datumJson.getString(key))) {

  57. datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Instances>>() {});

  58. } else if (KeyBuilder.matchSwitchKey(datumJson.getString(key))) {

  59. datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<SwitchDomain>>() {});

  60. } else if (KeyBuilder.matchServiceMetaKey(datumJson.getString(key))) {

  61. datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Service>>() {});

  62. }

  63.  
  64. // 该方法最终调用到 onPublish 方法

  65. raftConsistencyService.onPut(datum, source);

  66. return "ok";

  67. }

  68.  
  69. ......

  70. }

发布入口 RaftCommands.publish()

 
  1. @Component

  2. public class RaftCore {

  3.  
  4. ......

  5.  
  6. public void onPublish(Datum datum, RaftPeer source) throws Exception {

  7. RaftPeer local = peers.local();

  8. if (datum.value == null) {

  9. Loggers.RAFT.warn("received empty datum");

  10. throw new IllegalStateException("received empty datum");

  11. }

  12.  
  13. // 若该包不是 leader 发布来的,抛异常

  14. if (!peers.isLeader(source.ip)) {

  15. Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",

  16. JSON.toJSONString(source), JSON.toJSONString(getLeader()));

  17. throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +

  18. "data but wasn't leader");

  19. }

  20.  
  21. // 来源 term 小于本地当前 term,抛异常

  22. if (source.term.get() < local.term.get()) {

  23. Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",

  24. JSON.toJSONString(source), JSON.toJSONString(local));

  25. throw new IllegalStateException("out of date publish, pub-term:"

  26. + source.term.get() + ", cur-term: " + local.term.get());

  27. }

  28.  
  29. // 更新选举超时时间

  30. local.resetLeaderDue();

  31.  
  32. // 节点信息持久化

  33. // if data should be persistent, usually this is always true:

  34. if (KeyBuilder.matchPersistentKey(datum.key)) {

  35. raftStore.write(datum);

  36. }

  37.  
  38. // 添加到缓存

  39. datums.put(datum.key, datum);

  40.  
  41. // 更新 term 信息

  42. if (isLeader()) {

  43. local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);

  44. } else {

  45. if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {

  46. //set leader term:

  47. getLeader().term.set(source.term.get());

  48. local.term.set(getLeader().term.get());

  49. } else {

  50. local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);

  51. }

  52. }

  53. raftStore.updateTerm(local.term.get());

  54.  
  55. // 通知应用程序节点信息有变动

  56. notifier.addTask(datum.key, ApplyAction.CHANGE);

  57.  
  58. Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);

  59. }

  60. }

6. Raft保证内容一致性

Nacos通过Raft发布内容,内容只是存在了Leader节点上,通过Raft心跳机制来保证一致性

在注册信息的时候,addInstance() 方法将 instance 添加到了本地缓存中,但 raft 在从 leader 到 follower 同步数据的时候,follower 接收到包之后,只是通过 onPublish() 方法进行了持久化,并没有将信息更新到本地缓存,而是通过一个监听器来实现:

在 onPublish 方法最后,有一行:notifier.addTask(datum.key, ApplyAction.CHANGE);,即:将本次的变动,添加到通知任务中,我们来看通知任务将如何被处理:

 
  1. @Component

  2. public class RaftCore {

  3.  
  4. ......

  5.  
  6. public class Notifier implements Runnable {

  7. private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

  8. private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

  9.  
  10. // 添加变更任务到 tasks 队列

  11. public void addTask(String datumKey, ApplyAction action) {

  12.  
  13. if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {

  14. return;

  15. }

  16. if (action == ApplyAction.CHANGE) {

  17. services.put(datumKey, StringUtils.EMPTY);

  18. }

  19. tasks.add(Pair.with(datumKey, action));

  20. }

  21.  
  22. public int getTaskSize() {

  23. return tasks.size();

  24. }

  25.  
  26. // 处理任务线程

  27. @Override

  28. public void run() {

  29. Loggers.RAFT.info("raft notifier started");

  30.  
  31. while (true) {

  32. try {

  33. Pair pair = tasks.take();

  34.  
  35. if (pair == null) {

  36. continue;

  37. }

  38.  
  39. String datumKey = (String) pair.getValue0();

  40. ApplyAction action = (ApplyAction) pair.getValue1();

  41.  
  42. // 从服务列表中删除该 key

  43. services.remove(datumKey);

  44.  
  45. int count = 0;

  46.  
  47. if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {

  48. if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) {

  49. for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {

  50. try {

  51. // 根据变更类型,调用不同的回调方法来进行缓存更新

  52. if (action == ApplyAction.CHANGE) {

  53. listener.onChange(datumKey, getDatum(datumKey).value);

  54. }

  55.  
  56. if (action == ApplyAction.DELETE) {

  57. listener.onDelete(datumKey);

  58. }

  59. } catch (Throwable e) {

  60. Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);

  61. }

  62. }

  63. }

  64. }

  65.  
  66. if (!listeners.containsKey(datumKey)) {

  67. continue;

  68. }

  69.  
  70. for (RecordListener listener : listeners.get(datumKey)) {

  71. count++;

  72.  
  73. try {

  74. if (action == ApplyAction.CHANGE) {

  75. listener.onChange(datumKey, getDatum(datumKey).value);

  76. continue;

  77. }

  78.  
  79. if (action == ApplyAction.DELETE) {

  80. listener.onDelete(datumKey);

  81. continue;

  82. }

  83. } catch (Throwable e) {

  84. Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);

  85. }

  86. }

  87.  
  88. if (Loggers.RAFT.isDebugEnabled()) {

  89. Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count);

  90. }

  91. } catch (Throwable e) {

  92. Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e);

  93. }

  94. }

  95. }

  96. }

  97. }

上一篇:CF#712


下一篇:Lucene全文检索过程