前言
上篇文章已经介绍了 Eureka Server 环境和上下文初始化的一些代码,其中重点讲解了environment初始化使用的单例模式,以及EurekaServerConfigure基于接口对外暴露配置方法的设计方式。这一讲就是讲解Eureka Server上下文初始化剩下的内容:Eureka Client初始化。
EurekaServer上下文构建之Client
EurekaClientConfigure创建过程
因为eurekaSever是集群部署的,所以每个eurekaServer都需要注册到其他注册中心节点。这里自己既是一个eurekaServer,也是一个eurekaClient。
截取EurekaServer中初始化上下文代码:
// 3、初始化eureka-server内部的一个eureka-client(用来跟其他的eureka-server节点做注册和通信) // 类的开头已经说明了:EurekaInstanceConfig其实就是eureka client相关的配置类 if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); // DefaultEurekaClientConfig类似于上面的DefaultEurekaServerConfig类实现 EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); }
再看下eurekaClientConfig创建的代码:
public DefaultEurekaClientConfig(String namespace) { this.namespace = namespace.endsWith(".") ? namespace : namespace + "."; this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME); this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance); } public static DynamicPropertyFactory initConfig(String configName) { DynamicPropertyFactory configInstance = DynamicPropertyFactory.getInstance(); /** * 获取eureka client配置文件,类似于 {@link DefaultEurekaServerConfig}中的: * String eurekaPropsFile = EUREKA_PROPS_FILE.get(); * private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory * .getInstance().getStringProperty("eureka.server.props","eureka-server"); */ DynamicStringProperty EUREKA_PROPS_FILE = configInstance.getStringProperty("eureka.client.props", configName); String env = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT, "test"); ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env); String eurekaPropsFile = EUREKA_PROPS_FILE.get(); try { ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile); } catch (IOException e) { logger.warn( "Cannot find the properties specified : {}. This may be okay if there are other environment " + "specific properties or the configuration is installed with a different mechanism.", eurekaPropsFile); } return configInstance; }
看到上面代码想到了什么?这完全跟EurekaServerConfig创建的逻辑一样的呀,代码和DefaultEurekaServerConfig一致的逻辑。最后都是交给ConfigurationManager来管理。
EurekaClient创建过程
接着再来看eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
代码:
这段代码确实很长,我们一段段来解读,解读完后再看代码:
基于ApplicationInfoManager(包含了服务实例的信息、配置,作为服务实例管理的一个组件),eureka client相关的配置,一起构建了一个EurekaClient。
-
这里有两个配置:
config.shouldFetchRegistry()
和config.shouldRegisterWithEureka()
config.shouldFetchRegistry()
: 是否需要注册到别的注册中心。eurekaServer有个配置:eureka.client.fetchRegistry,单机情况下为false。false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务config.shouldRegisterWithEureka()
: 是否要向别的注册中心注册自己。eurekaServer有个配置:eureka.client.registerWithEureka,单机情况下为false。false表示自己不需要向注册中心注册自己 创建线程池调度任务
创建一个心跳线程池
创建一个缓存刷新线程池
初始化线程调度任务
具体代码如下,添加了一些代码备注:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { // AppName是服务名称,instanceInfo.getId就是服务实例id,类似于:ServiceA/0001 appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); // 是否需要注册到别的注册中心。eurekaServer有个配置:eureka.client.fetchRegistry,单机情况下为false。false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } // eurekaServer有个配置:eureka.client.registerWithEureka,单机情况下为false。false表示自己不需要向注册中心注册自己 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // 不需要注册也不需要抓取 释放不必要的资源 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh // 创建一个支持调度的线程池 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 创建一个心跳检查的线程池,最大线程数为5 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 支持缓存刷新的线程池,最大线程数为5 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 支持底层的eureka client跟eureka server进行网络通信的组件 eurekaTransport = new EurekaTransport(); // 发送http请求,调用restful接口 scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } // 如果要抓取注册表,但是抓取失败后,需要从备份中读取 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } // 初始化调度任务 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { // 抓取注册表的定时任务, if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // registryFetchIntervalSeconds默认为30s int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 执行cacheRefreshExecutor调度任务,默认是30s scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } // 如果要将自己注册到其他注册中心 if (clientConfig.shouldRegisterWithEureka()) { // 默认也是30s int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer // 执行heartbeatExecutor心跳检查,默认是30s scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator // 创建服务副本传播器 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 创建服务实例状态变更的监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 执行线程 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
总结
如果是eureka server的话,我们在玩儿spring cloud的时候,会将这个fetchRegistry给手动设置为false,如果是eureka server集群的话,就还是要保持为true。registerWithEureka也要设置为true。
(1)读取EurekaClientConfig,包括TransportConfig (2)保存EurekaInstanceConfig和InstanceInfo (3)处理了是否要注册以及抓取注册表,如果不要的话,释放一些资源 (4)支持调度的线程池 (5)支持心跳的线程池 (6)支持缓存刷新的线程池 (7)EurekaTransport,支持底层的eureka client跟eureka server进行网络通信的组件,对网络通信组件进行了一些初始化的操作 (8)如果要抓取注册表的话,在这里就会去抓取注册表了,但是如果说你配置了不抓取,那么这里就不抓取了 (9)初始化调度任务:如果要抓取注册表的话,就会注册一个定时任务,按照你设定的那个抓取的间隔,每隔一定时间(默认是30s),去执行一个CacheRefreshThread,给放那个调度线程池里去了;如果要向eureka server进行注册的话,会搞一个定时任务,每隔一定时间发送心跳,执行一个HeartbeatThread;创建了服务实例副本传播器,将自己作为一个定时任务进行调度;创建了服务实例的状态变更的监听器,如果你配置了监听,那么就会注册监听器