Eureka Server 同时也是一个 Eureka Client, 在不禁止 Eureka Server 的客户端行为时,它会向配置 文件中的其他 Eureka Server 进行拉取注册表、服务注册和发送心跳等操作。 作为服务注册中心,Eureka Server 提供了以下功能。
一、服务注册
接口: /eureka/apps/SEVER_NAME
源码如下:
// com.netflix.eureka.resources.ApplicationResource#addInstance
// 接受web请求
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// .......
// handle cases where clients may be registering with bad DataCenterInfo with missing data
// .......
// 注册服务
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
注册:
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册实例
super.register(info, leaseDuration, isReplication);
// 集群之间同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
// com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// 线程安全
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 根据 instanceId 获取实例的租约
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 如果该实例的租约已存在,比较最后更新时间戳的大小,取最大值的注册信息为有效
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 如果租约不存在,这是一个新的注册实例
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 自我保护机制
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 创建新的租约
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 如果租约存在,继承租约的服务上线初始时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 保存租约
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
// 添加最近注册队列,用来统计最近注册服务实例的数据
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 根据覆盖状态规则得到服务实例的最终状态,并设置服务实例的当前状态
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// 如果服务实例状态为UP,设置租约的服务上线时间,只有第一次设置有效
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 添加最近租约变更记录队列,标识ActionType为ADDED,这将用于Eureka Client增量式获取注册表信息
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置服务实例信息更新时间
registrant.setLastUpdatedTimestamp();
// 设置response缓存过期,这将用于Eureka Client全量获取注册表信息
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在 register 中,服务实例 InstanceInfo 保存在 Lease 中。在服务注册过程中为避免数据不一致需要加锁。如果租约存在,则比较两个租约的更新时间,保留时间戳大的服务实例信息。如果租约不存在,意味着这是一次全新的注册,将会进行自我保护的统计,创建新的租约保存 InstanceInfo 。接着将租约放到 registry 注册表中。
之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将 InstanceInfo 加入用于统计 Eureka Client 增量式获取注册表信息的 recentlyChangedQueue 和失效 responseCache 中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁并完成服务注册。
二、接受服务心跳
在 Eureka Client 完成服务注册后,它需要定时向 Eureka Server 发送心跳请求(默认30s),维持自己在 Eureka Server 中租约的有效性
代码如下:
// com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 根据 appName 获取服务集群的租约集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 租约不存在 直接返回 false
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// 根据覆盖状态规则得到服务实例的最终状态
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 如果得到的服务实例状态是 UNKNOWN 则取消续约
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 统计第分钟续租次数,用于自我保护
renewsLastMin.increment();
// 更新租约中的有效时间
leaseToRenew.renew();
return true;
}
}
在 renew 方法中,不关注 InstanceInfo ,仅关注于租约本身以及租约的服务实例状态。
三、服务剔除
如果 Eureka Client 在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈,所以需要服务剔除。
实现代码如下:
// com.netflix.eureka.registry.AbstractInstanceRegistry#evict()
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 自我保护相关,如果出现该状态,不允许剔除服务,可能是注册中心网络问题
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 遍历注册表 registry ,一次性获取所有的过期租约
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 计算最大允许剔除的租约的数量,获取注册表的租约总数
int registrySize = (int) getLocalRegistrySize();
// 计算注册表租约的阀值,与自我保护相关
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 计算租约的剔除数量
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
// 逐个随机剔除
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
服务剔除有很多限制,都是为了保证 Eureka Server 的可用性:
- 自我保护时期不能进行服务剔除操作
- 过期操作是分批进行
- 服务剔除是随机逐个剔除,剔除均匀分布在所有应用中,防止在同一时间内同一服务集群中的服务全部过期被剔除,以致大量剔除发生时,在未进行自我保护前促使程序的崩溃
服务剔除是一个定时的任务,定义了一个 EvictionTask 用于实时执行服务剔除,默认60秒一次。
四、服务下线
Eureka Client 在应用销毁时,会向 Eureka Server 发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。
代码如下:
// com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
// 根据 appName 获取服务实例的集群
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
// 移除服务实例的租约
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
// 将服务实例信息添加到最近下线服务实例统计队列
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
// 租约不存在,返回 FALSE
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 设置租约的下线时间
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 添加最近租约变更记录队列,标识 ActionType 为 Delete
// 这将用于 Eureka Client 增量式获取注册表信息
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 设置 response 缓存过期
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
五、集群同步
如果 Eureak Server 是通过集群的方式进行部署,那么为了维护整个集群中 Eureka Server 注册表数据的一致性,势必需要一个机制同步 Eureka Server 集群中的注册表数据。
Eureka Server 集群同步包含两个部分,一部分是 Eureka Server 在启动过程中从它的 peer 节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中;另一部分是 Eureka Server 每次对本地注册表进行操作时,同时会将操作同步到它的 peer 节点中,达到集群注册表数据统一的目的。
1. Eureka Server 初始化本地注册表信息
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#syncUp
public int syncUp() {
// 从临近的 peer 中复制整个注册表
// Copy entire entry from neighboring DS node
int count = 0;
// 如果获取不到,线程等待
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 获取所有的服务实例
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判断是否可以注册,主要用于 AWS 环境下进行,若部署在其他的环境,直接返回 true
if (isRegisterable(instance)) {
// 注册到自身的注册表中
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
Eureka Server 也是一个 Eureka Client , 在启动的时候也会进行 DiscoveryClient 的初始化,会从其对应的 Eureka Server 中拉取全是的注册表信息。在 Eureka Server 集群部署的情况下,Eureka Server从它的 peer 节点中拉取到注册表信息后,将遍历这个 Applications, 将所有的服务实例通过 AbstractRegistry#register 方法注册到自身注册表中。
在初始化本地注册表时, Eureka Server 并不会接受来自 Eureka Client 的通信请求。在同步注册表信息结束后会通过 PeerAwareInstanceRegistryImpl#openForTraffic 方法允许该 Server 接受流量。代码如下:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// 初始化自我保护机制统计参数
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
// 如果同步的应用实例数量为0,将在一段时间内拒绝 Client 服务获取注册表信息
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
// 判断是否是 AWS 运行环境,此处忽略
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 修改服务实例的状态为健康上线,可以接受流量
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
2. Eureka Server 之间注册表信息的同步
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 向 peer 集群中的每一个 peer 进行同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
// 同步下线
node.cancel(appName, id);
break;
case Heartbeat: // 同步心跳
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register: // 同步注册
node.register(info);
break;
case StatusUpdate: // 同步状态更新
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
3. 同步的实现细节
com.netflix.eureka.cluster.PeerEurekaNode
// 创建批量同步分发器
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
// com.netflix.eureka.util.batcher.AcceptorExecutor.AcceptorRunner
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
// 合并同一个服务的请求, 把合并后的任务放到 processingOrder 中
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
// com.netflix.eureka.util.batcher.TaskExecutors.BatchWorkerRunnable
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
// 真正发送批量请求的地方
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
六、获取注册表中服务实例信息
1. getApplicationsFromMultipleRegions
Eureka Client 在获取全量注册表信息时,Eureka Server 会通过这个方法封装 Applications , 并返回给 Client 。
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
// 从本地 registry 获取所有的服务实例信息 instanceInfo
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
// 获取远程 region 中的 Eureka Server 中的注册表信息
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
2. getApplicationDeltasFromMultipleRegions
获取增量注册表信息,将会从recentlyChangedQueue中获取最近变化的服务实例信息。recentlyChangedQueue中统计了近3分钟内进行注册、修改、剔除的服务实例信息
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
// 遍历 recentlyChangedQueue 队列获取最近变化的服务实例信息
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug("The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
if (includeRemoteRegion) {
// 获取远程 region 中的 Eureka Server 的增量式注册表信息
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
// 计算应用集合一致性哈希码,用以在 Eureka Client 在拉取时比对
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}