Apollo-client加载数据解析

如需参考源码解析,请访问:https://gitee.com/lidishan/apollo-code-analysis
阅读前声明:本文不做相关用法说明,只解析Apollo-client源码
阅读本文前请参考《Apollo-client 初始化入口流程解析

上文解析到了initialize()初始化,其是通过ConfigService.getConfig()来加载数据,那这个调用方式是怎么加载的?见下面

实时获取apollo配置解析ConfigService.getConfig()

如何使用?例子如下

// 获取namespace=application 默认命名空间键为key的字符串内容
String config = ConfigService.getConfig("application").getProperty("key", "默认值");

其加载过程也利用了SPI,我们直接看大佬实现的默认类DefaultConfigManager,就不自己造*了。如下

/**
 * 实时获取apollo配置信息工具类的manager
 * @author Jason Song(song_s@ctrip.com)
 */
public class DefaultConfigManager implements ConfigManager {
  private ConfigFactoryManager m_factoryManager;

  private Map<String, Config> m_configs = Maps.newConcurrentMap();
  private Map<String, ConfigFile> m_configFiles = Maps.newConcurrentMap();

  public DefaultConfigManager() {
    m_factoryManager = ApolloInjector.getInstance(ConfigFactoryManager.class);
  }
  
  /**
   * 这里是调用ConfigService.getConfig() 对应的方法
   * @param namespace the namespace
   */
  @Override
  public Config getConfig(String namespace) {
    // 维护类一个concurrentHashMap的缓存
    // -- 获取对应命名空间的缓存config
    Config config = m_configs.get(namespace);

    if (config == null) {
      synchronized (this) {
        config = m_configs.get(namespace);

        if (config == null) {
          // 不存在,则用factory进行创建
          // -- 默认实现:DefaultConfigFactoryManager
          ConfigFactory factory = m_factoryManager.getFactory(namespace);
          // -- 接下来看一下factory.create的实现(DefaultConfigFactory)
          config = factory.create(namespace);
          m_configs.put(namespace, config);
        }
      }
    }

    return config;
  }
  // ......省略其他代码
}

上面看到DefaultConfigManager->getConfig()->factory.create(namespace),接下来看DefaultConfigFactory是怎么生产config

  • 加载详细步骤如下
    • 调用DefaultConfigFactory#create()创建
      • 解析支持文件格式,匹配优先级为:properties > xml > json > yml > yaml > txt
      • 创建本地配置仓库
      • 走到trySync这步,判断走的是local还是remote
        • 如果走到local本地模式,会从本地对应格式的文件中加载配置出来,不会请求远程
        • 如果走到remote模式,有个构建函数会执行如下步骤:先加载远程的数据下来(期间会进行最多重试三次),然后开启定时五分钟拉取数据脚本,最后开启HTTP长轮询来监控数据变更推送
public class DefaultConfigFactory implements ConfigFactory {
  private static final Logger logger = LoggerFactory.getLogger(DefaultConfigFactory.class);
  private ConfigUtil m_configUtil;

  public DefaultConfigFactory() {
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
  }

  @Override
  public Config create(String namespace) {
    // 决定文件解析类型:
    // -- 默认支持解析的顺序(比如命名空间=namespace.json,那就会优先匹配这种格式的配置文件):
    // ------   properties > xml > json > yml > yaml > txt
    ConfigFileFormat format = determineFileFormat(namespace);// 返回支持的文件格式
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      // yml、yaml这两种格式走这里(因为要换行,解析方式有点区别)
      return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    // 1. 先创建本地配置仓库。createLocalConfigRepository
    //    eg:C:\opt\data\222537\config-cache\222537+default+application.properties
    //        C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
    //        C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.properties
    return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
  }

  /**
  // 先创建本地配置仓库。createLocalConfigRepository
  //    eg:C:\opt\data\222537\config-cache\222537+default+application.properties
  //        C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
  //        C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.properties
   **/
  LocalFileConfigRepository createLocalConfigRepository(String namespace) {
    // 可配置为local,只访问本地的缓存配置,但这样就没意义了。
    // -- 配置方式:VM options:-Denv = local
    if (m_configUtil.isInLocalMode()) {
      logger.warn(
          "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
          namespace);
      // 本地缓存模式,区别是upstream为空
      return new LocalFileConfigRepository(namespace);
    }
    // 创建远程配置仓库类
    return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }

  RemoteConfigRepository createRemoteConfigRepository(String namespace) {
    return new RemoteConfigRepository(namespace);
  }
  // ..... 省略其他逻辑
}

创建本地配置仓库,最终会调用到trySync()

// 位置:com.ctrip.framework.apollo.internals.LocalFileConfigRepository
public LocalFileConfigRepository(String namespace, ConfigRepository upstream) {
    m_namespace = namespace;
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    // 设置本地缓存路径
    this.setLocalCacheDir(findLocalCacheDir(), false);
    // 本地缓存模式是null,这个方法就不会调用。远程模式下调用这个就不为空
    this.setUpstreamRepository(upstream);
    this.trySync();
}
protected boolean trySync() {
    try {
      // 主要分为两种模式:local本地=LocalFileConfigRepository#sync()、remote远程=RemoteConfigRepository#sync()
      // -- local(-Denv=local):加载本地文件配置进properties
      // -- remote(配置类=RemoteConfigRepository):
      sync();
      return true;
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      logger
          .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
              .getDetailMessage(ex));
    }
    return false;
}

local本地模式

  • 配置方式:-Denv=local
  • 怎么处理的就不看了,主要是找到本地目录的配置文件,然后加载到jvm中,其一般目录长下面这个格式样子
C:\opt\data\222537\config-cache\222537+default+application.properties
C:\opt\data\222537\config-cache\222537+default+xxxxx.zy.properties
C:\opt\data\222537\config-cache\222537+default+xxxxx.candy_config.propertie

remote远程模式

  • 配置方式:-Denv=dev、-Denv=fat之类,反正不是local
  • 代码位置:RmoeteConfigRepository->sync()
  • 处理方式:先加载远程的数据下来(期间会进行最多重试三次),然后开启定时五分钟拉取数据脚本,最后开启HTTP长轮询来监控数据变更推送
/**
* Constructor.
* 远程配置仓库初始化构造函数时,步骤如下:
* - 1 初始化各种参数和工具类
* - 2 加载远程、本地的类
* - 3 启动定时拉取apollo远程服务数据的线程
* - 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
* @param namespace the namespace
*/
public RemoteConfigRepository(String namespace) {
    // 1 初始化各种参数和工具类
    m_namespace = namespace;
    m_configCache = new AtomicReference<>();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
        m_configUtil.getOnErrorRetryInterval() * 8);
    // 2 加载远程、本地的类
    this.trySync();
    // 3 启动定时拉取apollo远程服务数据的线程
    // -- 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
    this.schedulePeriodicRefresh();
    // 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
    // -- scheduleLongPollingRefresh->submit()->startLongPolling()->doLongPollingRefresh()
    this.scheduleLongPollingRefresh();
}

// 2 加载远程、本地的类
@Override
protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
    try {
      // 获取缓存
      ApolloConfig previous = m_configCache.get();
      ApolloConfig current = loadApolloConfig();
    
      // 引用相等意味着响应304 reference equals means HTTP 304
      if (previous != current) {
        // 不相等说明缓存变更,这时候重新设置缓存
        logger.debug("Remote Config refreshed!");
        m_configCache.set(current);
        // 触发监听器变更执行,this.getConfig()是将缓存变为properties
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }
    
      // 如果新加载的数据不为空,就打日志记录
      if (current != null) {
        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
            current.getReleaseKey());
      }
    
      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      transaction.setStatus(ex);
      throw ex;
    } finally {
      transaction.complete();
    }
}

// 3 启动定时拉取apollo远程服务数据的线程
// -- 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
/**
* 定时拉取数据 m_executorService.scheduleAtFixedRate
* - 默认线程启动5秒后,每隔5分钟进行刷新重新拉取
*/
private void schedulePeriodicRefresh() {
    logger.debug("Schedule periodic refresh with interval: {} {}",
        m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
    m_executorService.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
            logger.debug("refresh config for namespace: {}", m_namespace);
            trySync();
            Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
          }
        }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
        m_configUtil.getRefreshIntervalTimeUnit());
}

// 4 发起一个http到apollo服务器,用长轮询等待服务器的配置变更
// - 长轮询这步只是调用一个notiry接口来检测是否有变化,客户端会与服务端建立一个HTTP长连接,然后如果有变化,服务端就会推数据过来,如果超过90s没变动就会断开重新建立连接
// -- 如果返回状态码=200,就会调用notiry()方法,最后会调用到trySync()走上面(2 加载远程、本地的类)拉取远程数据的流程
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    // 这里根据 长轮询是否停止 && 线程是否终止 来进行终止长轮询
    while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
      // 这段代码感觉是多余的。因为上面的m_longPollStarted这个atomic已经确保了只有一个线程能进来了
      if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
        //wait at most 5 seconds
        try {
          TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }
      }
      Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
      String url = null;
      try {
        // lastServiceDto为空,可能是如下几种情况:
        // - 1 如果之前没有请求过导致没有把服务信息设置进lastServiceDto。比如第一次进行长轮询
        // - 2 如果是304,会进行随机true或false的判断,如果true,也置为空
        // - 3 长轮询过程中出错,catch会重新把这个置为null
        if (lastServiceDto == null) {
          // 获取服务端的配置信息
          List<ServiceDTO> configServices = getConfigServices();
          // 随机取一个
          lastServiceDto = configServices.get(random.nextInt(configServices.size()));
        }
        // 备注:拼接获取配置请求路径,其实这次长轮询请求的路径并不是直接获取数据,而是请求判断是否变更过的接口notifications/v2,如果变更过就调用trySync()进行请求触发
        // 备注:拼接获取配置请求路径,其实这次长轮询请求的路径并不是直接获取数据,而是请求判断是否变更过的接口notifications/v2,如果变更过就调用trySync()进行请求触发
        url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
                m_notifications);
    
        logger.debug("Long polling from {}", url);
    
        HttpRequest request = new HttpRequest(url);
        request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
        if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }
    
        transaction.addData("Url", url);
    
        final HttpResponse<List<ApolloConfigNotification>> response =
            m_httpUtil.doGet(request, m_responseType);
        // 注意了!!! 请求正常,并返回200,调用notify进行通知所有lisetener进行填充新的配置
        logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
        if (response.getStatusCode() == 200 && response.getBody() != null) {
          updateNotifications(response.getBody());
          updateRemoteNotifications(response.getBody());
          transaction.addData("Result", response.getBody().toString());
          notify(lastServiceDto, response.getBody());
        }
    
        //try to load balance
        if (response.getStatusCode() == 304 && random.nextBoolean()) {
          lastServiceDto = null;
        }
    
        m_longPollFailSchedulePolicyInSecond.success();
        transaction.addData("StatusCode", response.getStatusCode());
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {
        lastServiceDto = null;
        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
        transaction.setStatus(ex);
        long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
        logger.warn(
            "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
            sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
        try {
          TimeUnit.SECONDS.sleep(sleepTimeInSecond);
        } catch (InterruptedException ie) {
          //ignore
        }
      } finally {
        transaction.complete();
      }
    }
}
上一篇:Apollo配置灰度发布


下一篇:使用Docker搭建Apollo分布式配置中心