Apollo源码-配置发布通知客户端

前言

在更新Spring Cloud Alibaba Nacos时,想到之前阅读过Apollo的源码,便在这插入记录了过来,后续更新Nacos Config源码

Apollo简介

fork 源码地址 apollo源码
参考apollo架构中心设计
主要分为 Config ServiceAdmin ServicePortalClient 四部分
上文介绍到ReleaseMessage 对象的发布,portal发布配置第一件事新增 Release 对象,第二件事发布ReleaseMessage 紧接着第三件事便是本文要讲的 ConfigPublishEvent 事件

新建ConfigPublishEvent事件

@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}


private class ConfigPublishNotifyTask implements Runnable {

private ConfigPublishEvent.ConfigPublishInfo publishInfo;

ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}

@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
 Tracer.logError("Load release history failed", null);
 return;
}

sendPublishEmail(releaseHistory);

sendPublishMsg(releaseHistory);
}

跟踪代码也没发现和configservice的交互,事件监听者无非是创建一个线程池,执行线程任务,任务为发送release对象邮件,和调用远端hermes的一个接口。

配置发布,通知客户端

阅读官方文档 : 服务端设计文档中详细介绍了configservice是如何拉取ReleaseMessage的

实现方式如下:
1.Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender

2.Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
3.Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
4.NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端

如图 :
Apollo源码-配置发布通知客户端
那么我们跟着官方文档,来认识一下 ReleaseMessageScanner 这个类,该类实现了 InitializingBean 接口,会在容器启动,bean初始化后调用 afterPropertiesSet 方法

@Override
public void afterPropertiesSet() throws Exception {
//默认扫描间隔为1s
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//查询最大的一条Release记录
maxIdScanned = loadLargestMessageId();
//延迟1s后执行定时任务,受任务影响,需要等任务完成之后才开始计时
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
 //扫描消息
 scanMessages();
 transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
 transaction.setStatus(ex);
 logger.error("Scan and send message failed", ex);
} finally {
 transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//有新的message发布,通知configservice
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//取最后一条数据的id,赋值最大id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
//是否还有数据
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
 try {
   //通知所有消息监听器,触发handleMessage方法  
   listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
 } catch (Throwable ex) {
   Tracer.logError(ex);
   logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
 }
}
}
}

fireMessageScanned 会去通知所有的监听者,我们看看 ReleaseMessageListener 这个类图
Apollo源码-配置发布通知客户端
监听器的注册过程参见 ConfigServiceAutoConfiguration , handleMessage 方法是得到发布的配置并处理,根据官方文档指示 : Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器 ,客户端具体定位到 NotificationControllerV2#handleMessage

@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
//内容即为 : aapId + cluster + namespace
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
//如果channel不是apollo-release,则不处理
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
//retrieveNamespaceFromReleaseMessage实现了Function的lambda表达式,apply是通过该表达式返回namespace
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}

if (!deferredResults.containsKey(content)) {
return;
}

//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
//key : appId+cluster+namespace, value: messageId
configNotification.addMessage(content, message.getId());

//do async notification if too many clients
//客户端长轮询连接数 > 100
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
 logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
     bizConfig.releaseMessageNotificationBatch());
 for (int i = 0; i < results.size(); i++) {
   //100一个批次,就睡眠100ms
   if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
     try {
       //睡眠100ms
       TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
     } catch (InterruptedException e) {
       //ignore
     }
   }
   logger.debug("Async notify {}", results.get(i));
   //通知客户端,消息为:namespace,messageId
   results.get(i).setResult(configNotification);
 }
});
return;
}

logger.debug("Notify {} clients for key {}", results.size(), content);
//同步通知
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
  • 拉取到消息,通知到所有的ReleaseMessageListener实现类 ReleaseMessageScanner#fireMessageScanned ,会调用 handleMessage 方法。
  • content:appId+cluster+namespace 中取出 namespace
  • 组装 ApolloConfigNotification 对象 messageId、namespaceName、key : appId+cluster+namespace, value: messageId 的map 通知客户端。
  • 客户端使用 DeferredResult 长轮询技术。
上一篇:用Tinkercad学arduino之 红外遥控器


下一篇:利用 Python 分析了某化妆品企业的销售情况,我得出的结论是?