通过前面的学习,我们已经知道eureka-client向eureka-server主动上报了自己的通信地址
,这一过程是通过调用服务注册的接口来完成的。
现在eureka-server能够获取到所有eureka-client的通信地址了,可eureka-client是怎么获取其他eureka-client的通信地址的呢?本篇揭晓。
eureka-client是怎样初始化的
先来看看eureka-client初始化时做了哪些操作。不知道大家还记不记得,上一篇讲过在eureka-server的初始化过程中,有一个步骤是创建eurekaClient
对象。
为什么eureka-server的初始化
过程,需要创建eurekaClient
对象呢?
如果我们的eureka-server只部署在一台服务器上,并且只部署一个eureka-server,那么当然不需要创建eurekaClient
。可一旦需要将eureka-server部署成一个集群,情况就变得复杂起来了。
因为在集群中eureka-server之间需要相互注册,所以每一个eureka服务端,同时也是eureka的客户端。
这里只是简单的提一下,具体的内容,后面讲解eureka-server集群相关的知识时再做补充。此处我们只是借助eureka-server的启动,来查看eurekaClient
的初始化逻辑。
先贴一小段代码看看,大家脑海里有个大概的印象,后面画一张流程图讲解。
//创建eurekaClient对象
if (eurekaClient == null) {
//读取eureka-client.properties中的配置
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
//通过instanceConfig和InstanceInfo构造Manager对象
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
//创建eurekaClient
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}
流程图:
InstanceInfo
上一章里提到过,里面包含了服务的ip、端口号、服务名称、实例id等信息。服务注册时,eureka-client会把InstanceInfo
中的信息发送给eureka-server。
现在大家知道InstanceInfo
是在哪里创建的了:基于我们自己的配置文件+一些默认配置构成。
eureka-client是怎样获取注册表的
上文创建了一个DiscoveryClient
对象,在该对象创建的过程中,eureka-client发起http请求向eureka-server请求到了所有的注册表信息。
老规矩,先上一张DiscoveryClient
创建的源码简图。
拉取注册表的主要逻辑在getAndStoreFullRegistry()
方法中,贴出来看一下(部分代码省略)
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
//构建http请求,发起请求
EurekaHttpResponse<Applications> httpResponse = 略...;
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//从请求结果中,获取服务列表
apps = httpResponse.getEntity();
}
if (apps == null) {
logger.error("略...");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//保存服务列表
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("略...");
}
}
服务列表被保存在AtomicReference<Applications> localRegionApps
中,稍后我们会通过断点来看看Applications
中的内容。
在此之前,我们先来看看,eureka-server在接收到客户端http请求后做了哪些逻辑处理?你可能会想:这有什么难的?直接把注册表数据返回不就可以了?憋着急,我们来看看源码验证一下你的猜想正不正确。
eureka-server是怎样返回注册表的
eureka-server接收http请求的代码,在eureka-core工程下的ApplicationsResource.java中,目录结构如下:
返回服务列表的方法是getContainers(参数略)
,下面我们通过一张代码简图,来看看方法内部的主要逻辑。
在eureka-server的处理逻辑中,用到了一套多级缓存机制,返回服务列表时,不是直接返回注册表,而是先从只读缓存中
读取,如果没有缓存数据,再从读写缓存
中取,如果读写缓存也没有,则从注册表
读取。
只读缓存:ConcurrentMap<Key, Value> readOnlyCacheMap
读写缓存:LoadingCache<Key, Value> readWriteCacheMap
,基于com.google.common.cache.LoadingCache
实现。
缓存读取流程如图:
既然这里使用了缓存,那么一个新的问题就被引入了,什么时候过期缓存数据呢?
1.创建readWriteCacheMap
时,指定了180s自动过期。(定时过期)
public class ResponseCacheImpl implements ResponseCache {
private final LoadingCache<Key, Value> readWriteCacheMap;
//读写缓存创建
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
//从配置中读取缓存过期时间,指定时间单位为-秒
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
}
时间配置在DefaultEurekaServerConfig.java
中:
public class DefaultEurekaServerConfig implements EurekaServerConfig {
@Override
public long getResponseCacheAutoExpirationInSeconds() {
//默认配置180
return configInstance.getIntProperty(
namespace + "responseCacheAutoExpirationInSeconds", 180).get();
}
}
2.有新服务注册时,缓存会过期。(主动过期)
前面的内容里,我们讲解服务注册时,实际上是调用了eureka-server
中AbstractInstanceRegistry.java
的register()
方法,当时我们忽略了一些细节,其中就包括过期缓存的这一小段。现在我们来看一看。
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public void register(略...) {
//调用内部的invalidateCache()方法
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),
registrant.getSecureVipAddress());
}
private void invalidateCache(略...) {
// 调用ResponseCacheImpl.java中过期缓存的方法
responseCache.invalidate(appName, vipAddress, secureVipAddress);
}
}
从invalidateCache()
可以看出,实际上还是使用的ResponseCacheImpl.java
内部定义的invalidate()
方法。
public void invalidate(Key... keys) {
for (Key key : keys) {
//使用缓存框架自带的清除方法,清除缓存
readWriteCacheMap.invalidate(key);
//略...
}
}
3.ResponseCacheImpl.java
创建时启动了一个定时任务,每隔30秒就会去过期缓存。(被动过期)
public class ResponseCacheImpl implements ResponseCache {
//构造方法
ResponseCacheImpl(略...){
//是否启动只读缓存,默认true
if (shouldUseReadOnlyResponseCache) {
//具体要执行的任务:getCacheUpdateTask()
timer.schedule(getCacheUpdateTask(),
//间隔多少毫秒后,首次执行此任务
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
//每隔多少秒后,重复执行此任务
//config中的配置:"responseCacheUpdateIntervalMs", (30 * 1000)
responseCacheUpdateIntervalMs);
}
}
//具体的缓存处理逻辑
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
//遍历readOnlyCacheMap中所有缓存
for (Key key : readOnlyCacheMap.keySet()) {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
//如果'只读缓存'中的数据和'读写缓存'的数据不一致,则用'读写缓存'数据覆盖掉'只读缓存'
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
}
}
};
}
}
一张图总结一下:
最后,我们来看一下eureka-server返回的注册表数据
长什么样。
Applications
大概长什么样。
至此eureka-client获取注册表的机制我们就学习了二分之一了。
为什么说是二分之一?憋着急,继续往后看。
现在eureka-client已经能够获取到注册表了,看上去似乎没什么问题,可只要我们多想一步,就会发现问题所在。比如,一旦有新的服务注册到eureka-server上去,之前已经获取到注册表的eureka-client怎么同步新的数据呢?
eureka-client是怎么更新注册表的
为了获取到eureka-server端发生变动的注册表,在eureka-client初始化时启动了一个定时任务,每隔30秒就向eureka-server请求一次变化的注册表数据。
本文开始的代码简图中,我们梳理了discoverClient
创建过程,其中有一段是initScheduledTasks()
,也就是初始化定时任务的地方,当时让大家暂时忽略,现在我们来看一下。
在initScheduledTasks()
中包含多个定时任务,暂时我们只关注了其中刷新注册表的定时任务
,余下部分依然还是后面用到时再来看。
主要逻辑代码如下:
@Singleton
public class DiscoveryClient implements EurekaClient {
private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
//略...
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
//间隔多久执行一次定时任务,默认30(从默认配置获取)
registryFetchIntervalSeconds,
//间隔时间的单位
TimeUnit.SECONDS,
expBackOffBound,
//具体任务逻辑
new CacheRefreshThread()
),registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
这里我们先来看一下eureka-client的默认配置是定义在什么地方的,定义在DefaultEurekaClientConfig.java
中:
public class DefaultEurekaClientConfig implements EurekaClientConfig {
@Override
public int getRegistryFetchIntervalSeconds() {
//默认配置:定时器间隔多久执行
return configInstance.getIntProperty(
namespace + REGISTRY_REFRESH_INTERVAL_KEY, 30).get();
//配置的key定义:PropertyBasedClientConfigConstants.java
//String REGISTRY_REFRESH_INTERVAL_KEY = "client.refresh.interval";
}
}
了解了一下默认配置,接下来我们把思路拉回来,继续看刷新注册表的定时任务。
由于代码内部的方法调用过于复杂,此处就不贴源码了,采用代码简图代替。
最主要的方法是getAndUpdateDelta()
,贴出源码看一下:
private void getAndUpdateDelta(Applications applications)
Applications delta = null;
//发送获取增量注册表的请求
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.
getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
//如果返回数据为空,则重新拉取所有的注册表
if (delta == null) {
getAndStoreFullRegistry();
}else if(略...){
//将获取到的注册表数据,与本地合并
updateDelta(delta);
//将合并后的结果,生成一个HashCode (此处稍后分析)
String reconcileHashCode = getReconcileHashCode(applications);
//对比http请求返回的HashCode和新生成的HashCode
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()){
//对比结果不一致,重新拉取全量注册表
reconcileAndLogDifference(delta, reconcileHashCode);
}
}else{
//log.err(..);
}
)
从源代码中可以看到,eureka-client获取到数据后,本地进行了一个合并和校验数据的过程,我们来看看这一块。
注册表数据合并:eureka-client获取到注册表数据后,会根据一个ActionType
来判断服务实例的变动类型,也就是判断服务实例到底是需要新增到本地的注册表中,还是要从本地注册表删除,还是需要将本地的某个服务实例信息进行更新。
先看一眼源码,稍后通过流程图加以理解:
private void updateDelta(Applications delta) {
//遍历获取到的注册表信息
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (ActionType.ADDED.equals(instance.getActionType())) {
//新增
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}else if (ActionType.MODIFIED.equals(instance.getActionType())) {
//修改
applications.***.addInstance(instance);
}else if (ActionType.DELETED.equals(instance.getActionType())) {
//删除
applications.***.removeInstance(instance);
}
}
}
}
**数据校验:**在eureka-server更新完本地注册表后,会将本地的注册表信息做一个hash计算
得到一个哈希值,同时eureka-server在返回数据时,也携带了一个哈希值。
从理论上讲经过一轮更新后,eureka-server和eureka-client中的注册表数据是完全一致的,所以得出的hash计算结果也应该是一样的。
如果不一样,说明eureka-client和eureka-server之间的数据同步出现了问题,那么此时eureka-client会重新向eureka-server请求一次全部的注册表数据。然后将新获取到的数据覆盖掉本地的注册表数据,以保证自己和eureka-server的数据一致。
结合流程图,理解合并和数据校验过程:
以上,整个eureka-client端是怎样定时发送http请求的,获取到数据是怎么进行合并和校验的,就已经梳理清楚了。
接下来看看eureka-server的接口是怎样处理的呢?还是通过多级缓存机制返回数据吗?
eureka-server增量注册表数据是怎么维护的
对于增量注册表数据,eureka-server依然是通过多级缓存机制来返回,但是由于注册表信息在不断发生变化,所以eureka-server是不会重复的返回所有的注册表信息的,在这里eureka-server借助了Queue(队列)
来记录变化的那一部分注册表信息。
每当发生服务注册或者服务主动下线时,就将变化的注册表信息发送到一个recentlyChangedQueue
中,同时在需要这部分数据时,直接取queue
中的数据即可。
配合流程图理解:
recentlyChangedQueue
定义在AbstractInstanceRegistry.java
中,如果大家还有印象,就能记得前面代表注册表的ConcurrentHashMap
也是定义在这里。
我们简单来看看recentlyChangedQueue
长什么样:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
//变量定义
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
= new ConcurrentLinkedQueue<RecentlyChangedItem>();
//队列中存放的数据类型(一个内部类)
private static final class RecentlyChangedItem {
private long lastUpdateTime;
private Lease<InstanceInfo> leaseInfo;
}
}
其中的Lease
、InstanceInfo
分别代表租约信息和服务实例信息,前面已经讲过了。
接下来我们看一下readWriteCacheMap
从Queue
中数据的逻辑。
public class ResponseCacheImpl implements ResponseCache {
private final AbstractInstanceRegistry registry;
//readWriteCacheMap根据指定key未获取到数据,则执行此方法
private Value generatePayload(Key key) {
//获取全部注册表数据(全量数据)
if (ALL_APPS.equals(key.getName())) {略..}
//获取变化的注册表数据(增量数据)
else if(ALL_APPS_DELTA.equals(key.getName())){
payload = getPayLoad(key,
//调用AbstractInstanceRegistry.java的
//getApplicationDeltasFromMultipleRegions()方法获取队列数据
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
}
}
}
大家想一想,这里使用队列存储数据会产生一个什么新的问题?随着时间推移,recentlyChangedQueue
中的数据会不断增加,这就导致eureka-client多次定时获取数据时,会获取到重复的数据。
明明之前已经获取过的数据,再反复的重复的获取,完全没有必要。因为eureka-client获取到增量注册表数据后,还需要在本地做一些合并和校验工作,那么随着数据的增多,从网络传输、合并数据、校验数据整条工作路径上的效率都会降低。
所以eureka-server开启了一个定时任务,每隔30秒
就清理一下recentlyChangedQueue
中的数据,确保Queue
中的数据是在180秒
内发生变化的服务实例。
定时任务清除队列数据流程图:
贴一小段简化源码:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
//构造方法
protected AbstractInstanceRegistry(略..){
//开启定时任务
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
//默认配置30s
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
//定时任务执行逻辑
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
//遍历队列中的数据
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
//将180s内未发生变动的服务,从队列中删除
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() -
//默认180s
serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
}
到此,整个服务注册和服务发现的核心流程我们就全部明白了,下一篇我们来看看eureka的心跳机制
是如何实现的。