1、前面我们分析了eureka服务端实现原理、客户端实现原理,今天我们来分析一下eureka集群同步的原理,如何搭建以及使用eureka集群我们在前面以及演示过了,此处不在过多累赘。
2、eureka服务端也是客户端:
前面我们分析eurekaServer端的时候提到过,服务端依赖了客户端的实现,且也会在服务端启动之前就构建一个客户端,那么为啥需要这样做呢?????
答:原因就是服务端启动的时候需要去同步其他节点的服务注册表数据。 接下来我们来验证是否是这样是实现的。
首先来查看 EurekaServerAutoConfiguration 这个自动配置类:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
。 。 。
}
主要查看 @Import(EurekaServerInitializerConfiguration.class),进入 EurekaServerInitializerConfiguration这个类:
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
。 。 。
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
主要看这里,初始化eurekaServerBootstrap
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
。 。 。
}
初始化的具体实现:
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
主要看这里,初始化eurekaServer上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
初始化上下文的实现:
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
主要看这里,从其他节点node同步数据刀当前的eurekaServer节点
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
同步其他节点的实现(这里很重要):
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
1、使用eurekaClient获取从其他节点上拉取的服务注册表,这里是从eurekaClient的本地缓存中获取服务注册表,还记得之前讲解过的客户端的缓存刷新任务吗,对,就是使用那个任务进行数据刷新获取数据缓存起来,然后eurekaServer启动的时候去获取这个服务注册表的缓存,然后注册在eurekaServer自己上面。
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
2、然后将从当前节点的eurekaClient中获取的服务注册表的缓存数据注册到自己这个节点上。
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
好了,eurekaServer启动的时候同步其他节点node服务注册表数据就是这样实现的。
3、eurekaServer在运行期间是如何同步其他节点的注册数据呢????
我们先来看看服务端提供的实例添加接口:在 ApplicationResource 中:
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
调用注册器实例进行注册
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
来到 PeerAwareInstanceRegistryImpl 的registry方法中:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
主要看这里,复制到其他同等的节点,操作是注册。
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
复制到其它节点的实现:
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
1、主要看这里,eurekaServer集群中其他的所有节点,然后遍历。peerEurekaNodes这个Bean会在自动装配阶段进行自动装配。
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
2、复制当前服务端的操作到其他同等的节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
复制当前实例的操作到某个同等节点的实现:
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
1、如果是 info实例 的下线操作,进行node节点的 info实例 同步下线
case Cancel:
node.cancel(appName, id);
break;
2、如果是 info实例 的心跳续约操作,进行node节点的 info实例 同步心跳续约
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
3、如果是 info实例 的注册操作,那就将info实例同步注册到node节点上。
case Register:
node.register(info);
break;
4、如果是 info实例 的状态改变操作,那就将info实例的状态改变同步到node上。
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
5、同步删除状态覆盖
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
我们以registry 操作来进行讲解,其他的操作实现都一致:来到 PeerEurekaNode 的 registry(...) 方法:
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
使用一个批量分发器去处理这个registry操作到目标node节点中。此处会将这个registry操作封装为
一个实例复制任务InstanceReplicationTask实例,然后交给batchingDispatcher去处理。
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
每一个InstanceReplicationTask的业务逻辑就是发起http请求,将当前的注册的
实例info 以同样的注册方式注册到需要复制的节点上。
return replicationClient.register(info);
}
},
expiryTime
);
}
那么核心流程分析完成了,我们来讲解一下批处理分发器 batchingDispatcher,默认在构建节点实例 PeerEurekaNode 的时候会实例化 一个批量处理分发器以及一个非批量处理分发器:
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
}
/* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
构建一个批处理分发器
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
构建一个非批处理分发器
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
在同步各种Action的时候默认是使用批处理分发器 batchingDispatcher 。为什么要使用批处理分发器呢????
答:原因就是提高效率,在服务端每一个eurekaServer都提供了一 个支持批量同步Action的rest接口:
批量同步接口
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error("{} request processing failed for batch item {}/{}",
instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
既然批量那就有几个问题需要拉出来聊了,什么时候触发批量的rest接口调用?
答:批量的触发有两个条件:
1、任务数量:如果在一个周期内一旦达到了一定的任务数量(任务数量=同步的Action数量)就触发一次批量的rest接口调用,默认是周期内满250个就触发。
2、周期:等待一个周期,即使没有达到任务数量,也会发起一次批量的rest接口调用,默认的周期时长是500ms。
由谁来发起批量请求呢????
答:batchingDispatcher只负责分发,并不负责发起批量请求,发起批量请求由batchingDispatcher的任务处理器taskProcessor进行执行:源码如下:在 ReplicationTaskProcessor 中:
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
发起目标节点的批量同步接口。
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion then TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
以上就是eurekaServer集群的工作原理。