private void executeAsyncInvoke() { while (!queue.isEmpty()) { NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( targetIp)) { // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知 if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) { // target ip 不健康,则放入通知列表中 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task int delay = getDelayTime(task); Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); queue.add(task); AsyncTask asyncTask = new AsyncTask(httpclient, queue); ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP); if (task.isBeta) { request.setHeader("isBeta", "true"); } httpclient.execute(request, new AyscNotifyCallBack(httpclient, task)); } } } }
request中的内容debug出来
http://10.129.13.96:8848/nacos/v1/cs/communication/dataChange?dataId=springboot2-nacos-config&group=DEFAULT_GROUP
这里又发了一个请求出去,跳转到/communication/dataChange这个里面去,继续跟进,
这个请求转发到
CommunicationController.java中来了
/** * 通知配置信息改变 */ @RequestMapping(value = "/dataChange", method = RequestMethod.GET) @ResponseBody public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }
TaskManager.java
/** * 将任务加入到任务Map中 * * @param type * @param task */ public void addTask(String type, AbstractTask task) { this.lock.lock(); try { AbstractTask oldTask = tasks.put(type, task); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (null != oldTask) { task.merge(oldTask); } } finally { this.lock.unlock(); } }
task具体内容如下
这目前为止,Nacos的整个思路就是把每一个需要通知的操作,封装成一个task,直接把这个task扔到一个队列里面去,然后这个队列在不断的循环去poll,
只要队列里面有东西,就去执行这个task对应的processor;
TaskManager.java
这个类就是我们最终的处理类,真正去做通知更新的管理类,上代码,这里就不贴整个类了,局部展示,
public final class TaskManager implements TaskManagerMBean { class ProcessRunnable implements Runnable { public void run() { while (!TaskManager.this.closed.get()) { try { Thread.sleep(100); TaskManager.this.process(); } catch (Throwable e) { } } } }
这里面起了一个线程,只要没有关闭,就死循环去执行process;
protected void process() { for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) { AbstractTask task = null; this.lock.lock(); try { // 获取任务 task = entry.getValue(); if (null != task) { if (!task.shouldProcess()) { // 任务当前不需要被执行,直接跳过 continue; } // 先将任务从任务Map中删除 this.tasks.remove(entry.getKey()); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); } } finally { this.lock.unlock(); } if (null != task) { // 获取任务处理器 TaskProcessor processor = this.taskProcessors.get(entry.getKey()); if (null == processor) { // 如果没有根据任务类型设置的处理器,使用默认处理器 processor = this.getDefaultTaskProcessor(); } if (null != processor) { boolean result = false; try { // 处理任务 result = processor.process(entry.getKey(), task); } catch (Throwable t) { log.error("task_fail", "处理task失败", t); } if (!result) { // 任务处理失败,设置最后处理时间 task.setLastProcessTime(System.currentTimeMillis()); // 将任务重新加入到任务Map中 this.addTask(entry.getKey(), task); } } } }
重点!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
class DumpProcessor implements TaskProcessor { DumpProcessor(DumpService dumpService) { this.dumpService = dumpService; } @Override public boolean process(String taskType, AbstractTask task) { DumpTask dumpTask = (DumpTask)task; String[] pair = GroupKey2.parseKey(dumpTask.groupKey); String dataId = pair[0]; String group = pair[1]; String tenant = pair[2]; long lastModified = dumpTask.lastModified; String handleIp = dumpTask.handleIp; boolean isBeta = dumpTask.isBeta; String tag = dumpTask.tag; if (isBeta) { // beta发布,则dump数据,更新beta缓存 ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant); boolean result; if (null != cf) { result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps()); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeBeta(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { if (StringUtils.isBlank(tag)) { ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant); if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) { if (null != cf) { AggrWhitelist.load(cf.getContent()); } else { AggrWhitelist.load(null); } } if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { if (null != cf) { ClientIpWhiteList.load(cf.getContent()); } else { ClientIpWhiteList.load(null); } } if (dataId.equals(SwitchService.SWITCH_META_DATAID)) { if (null != cf) { SwitchService.load(cf.getContent()); } else { SwitchService.load(null); } } boolean result; if (null != cf) { result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.remove(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag); // boolean result; if (null != cf) { result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeTag(dataId, group, tenant, tag); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } } } final DumpService dumpService; }