SpringCloud服务治理与负载均衡原理

老规矩本文还是重点从原理上讨论,不涉及使用。

 Eureka原理

1  服务治理

整体而言Eureka的服务治理模型如下所示:

SpringCloud服务治理与负载均衡原理

Eureka的服务治理原理与过程是:

 - 启动注册中心。注册中心启动时会:启动一个定时任务用于检测、清理失效的任务(由EvictionTask实现)。

 - 启动服务集群。服务提供者启动时会:向注册中心注册服务(由DiscoveryClient#InstanceInfoReplicator类实现);启动一个定时任务用于定时发送心跳信息到注册中心(由DiscoveryClient#HeartbeatThread类实现)。

 - 启动消费者集群。服务消费者启动时会:全量的从注册中心获取服务(由DiscoverClient#getAndStoreFullRegistry()方法实现);启动定时任务用于定时增量的从注册中心获取服务信息(由DiscoverClient#getAndUpdateDelta()方法实现)。

 - 注册中心是集群模式时,会将收到的注册信息在多个Eureka Server之间拷贝,这是一种高可用方案,当部分Eureka Server失效后,整个集群依然可以运行。

 - 服务提供者向注册中心发送心跳的目的是告诉注册中心我还活着,别将我踢出了。

 - 服务消费者获取到服务提供者信息以后,会在本地缓存提供者信息,当需要使用提供者服务时,直接从缓存的清单中获取提供者信息,然后通过负载均衡的方式进行调用。本地缓存服务提供者清单可以在集群的注册中心不可用时,依然能够使用服务。

这里只是简单介绍一下Eureka的流程,后面会从源码的角度分析。

 

2  服务分区

eureka通过region和zone来进行分区。region:可以简单理解为地理上的分区,比如亚洲地区、华北地区等等,没有具体大小的限制。zone:可以简单理解为region内的具体机房,比如说region划分为上海,然后上海有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

通过region和zone可以实现一种高可用的多活方案,如下图所示:

SpringCloud服务治理与负载均衡原理

正常情况下,zone1中的Consumer1消费Provider1提供的服务,这样性能损耗最低。

当zone1中的Provider1服务不可用时,消费者可以转向zone2消费Provider2,如下图所示:

SpringCloud服务治理与负载均衡原理

 

3  对比Zookeeper

1)   Eureka保证AP

CAP中,Eureka保证AP:Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务,只要有一台Eureka Server还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

如果在15分钟内超过85%的节点都没有正常的心跳,Eureka会认为客户端与注册中心出现了网络故障,那么Eureka将进入自我保护机制,此时的处理逻辑是:

 - Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务

 - Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)

  - 当网络稳定时,当前实例新的注册信息会被同步到其它节点中

2)   Zookeeper保证CP

CAP中Zookeeper保证CP:当zookeeper集群处于选主过程中,将不能向zk注册服务,也不能获取服务;当zookeeper集群超过半数机器不可用,因为无法选出master,也就处于瘫痪状态。就这一点而言,作为注册中心,其实并不是最好的选择。

 

 

 服务注册

1  服务提供方注册

1)   源码分析

服务注册主要有两个步骤组成:注册和续约。注册是为了将提供的服务信息注册到EurekaServer端,续约是在注册完服务之后,服务提供者会维护一个心跳用来持续告诉EurekaServer我还活着,以防止Eureka Server的剔除任务将该服务实例从服务列表中排除出去。

对于DiscoverClient而言,主要的逻辑是:创建DiscoverClient时,会注册当前服务到EurekaServer,并且启动一个定时程序来不断的续约。

a)    注册

在DiscoverClient的构造函数中,会调用initScheduledTasks()方法,这个方法就是用来注册服务的。示例代码如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {

//省略代码
    initScheduledTasks();
//
省略代码
}

initScheduledTasks是注册和续约的核心逻辑,他先通过InstanceInfoReplicator来注册服务,然后创建定时程序不断的续约。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
               registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                "heartbeat", scheduler, heartbeatExecutor, 
                renewalIntervalInSecs, TimeUnit.SECONDS, 
                expBackOffBound, new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                       InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
               instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
           applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

InstanceInfoReplicator#start方法将会触发注册操作,注册的源码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

以下为截取注册信息(即instanceInfo信息),它包含了查找服务提供者的所有信息。

SpringCloud服务治理与负载均衡原理

 

b)    续约

通过以下两个属性,可以控制续约的间隔时间、续约过期时间:

eureka.instance.lease-renewal-interval-in-seconds=30

eureka.instance.lease -expiration-duration-in-seconds=90 

 

在DiscoveryClient#initScheduledTasks中,会启动一个定时程序,用不断的续约。

private void initScheduledTasks() {
//
省略代码
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                       "heartbeat",
                        scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
//
省略代码
}

续约的定时程序最终将触发DiscoveryClient#renew方法,此方法将会发送心跳信息到EurekaServer。

boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
           REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

 

2  注册中心管理服务

注册中心受理注册申请的入口在ApplicationResource#addInstance方法中,他通过调用PeerAwareInstanceRegistry#register方法完成注册。PeerAwareInstanceRegistryImpl#register首先进行服务登记,然后将此服务信息复制到其他的注册中心,源码如下所示:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

注意:续约的入口是InstanceResource#renewLease方法

 

 

1)   服务登记

服务登记的逻辑比较复杂,简单来说就是将收到的注册申请信息放到一个ConcurrentHashMap中,然后更新此Instance的时间、状态等信息,这些信息对于服务中心统计服务存活状态非常重要。源码如下所示:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
       REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) {
                   logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                } else {
                   registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                   this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                   this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
           lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                   registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
               overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
           registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
       registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
       registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
       registrant.setLastUpdatedTimestamp();
       invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
)

 

 

2)   拷贝服务信息到其他注册中心

拷贝服务信息到其他注册中心,可以实现高可用,只向一个Eureka Server中注册服务,他们在整个Eureka Server集群中都能看到这些服务实例,那么当出现注册中心部分机器不可用的时候,这个集群依然可用。

拷贝逻辑的核心是:首先确认是否要拷贝,然后获取出所有的Eureka Server节点,然后将此信息逐个拷贝到这些节点中。

private void replicateToPeers(Action action, String appName, String id,
          InstanceInfo info /* optional */,
          InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
           replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

replicateInstanceActionsToPeers接口根据不同的action执行不同的业务操作,在服务注册阶段,这里是通过node.register来将服务的实例信息注册到其他注册中心节点。

private void replicateInstanceActionsToPeers(Action action, String appName,
        String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

 

3)   服务提供者关闭服务

服务提供者关闭时,依然会调用ApplicationResource#addInstance方法,区别是,注册时参数InstanceInfo中status=UP,关闭时参数InstanceInfo中status=DOWN。因为状态不懂,所以关闭时,不会更新租约信息的时间信息。

 

4)   清理失效的服务

Eureka Server启动时,会创建一个定时器,定期检查过期的服务,然后将这些服务器清理掉。AbstractInstanceRegistry#postInit方法就是用来清理失效服务的,源码如下:

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
   evictionTimer.schedule(evictionTaskRef.get(),
           serverConfig.getEvictionIntervalTimerInMs(),
           serverConfig.getEvictionIntervalTimerInMs());
}

 

AbstractInstanceRegistry#evict包含了清理失效服务的核心逻辑,

 - 首先检查是否清理租赁期满的服务。

 - 如果需要清理,那么从registry(一个ConcurrentHashMap)中逐个取出服务信息并检查是否已经过期。

 - 如果过期那么放到expiredLeases集合中。

 - 通过方法Math.min(expiredLeases.size(), evictionLimit)计算出需要清理的数量,然后随机清理。

源码如下所示:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                   expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
           Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

 

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法用来计算是否启用租赁过期清理功能。源码如下所示:

public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

配置项eureka.server.enable-self-preservation用来控制注册中心的保护机制,默认为false。isSelfPreservationModeEnabled()方法由此配置项控制。Eureka会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者,如果关闭服务注册中心将不可用的实例正确剔除

 

 服务发现

1  消费端获取服务

当消费者启动时,会发送一个REST请求给服务注册中心,来获取注册中心注册的服务清单。为了性能考虑,Eureka Server会维护一份只读的服务清单来返回给消费端,同时消费端也会缓存此列表,默认每隔30秒更新一次。有两个与之相关的参数:

eureka.client.fetch-registry= true //是否获取注册的服务

eureka.client.registery-fetch-interval-second=30 //多久获取一次注册的服务,单位是秒。

1)   启动时获取服务

在DiscoverClient启动时,会通过以下代码触发获取注册服务的请求,如果获取失败,那么会通过fetchRegistryFromBackup方法获取。

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}

获取服务并缓存的逻辑由fetchRegistry实现,具体来说就是通过getAndStoreFullRegistry()或者getAndUpdateDelta()方法来获取提供者信息;然后刷新缓存,最后更新服务状态标识。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
           getAndUpdateDelta(applications);
        }
       applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

 

2)   定时更新

在DiscoverClient启动时,会启动一个定时程序,用于定时获取服务信息。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
//
省略代码
}

 

2  注册中心受理获取服务的请求

1)   全量获取

消费端首次启动时,通过这个方法获取全量信息

public InstanceResource getInstanceInfo(@PathParam("id") String id) {
    return new InstanceResource(this, id, serverConfig, registry);
}

从以下的截图可以看到,请求服务信息时,注册中心直接返回了缓存的服务列表信息。

SpringCloud服务治理与负载均衡原理

 

2)   增量获取

增量更新的逻辑如下,对用于处理消费端getAndUpdateDelta()的请求。

@Path("delta")
@GET
public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
       @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
       @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

    // If the delta flag is disabled in discovery or if the lease expiration
    // has been disabled, redirect clients to get all instances
    if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
        return Response.status(Status.FORBIDDEN).build();
    }

    String[] regions = null;
    if (!isRemoteRegionRequested) {
       EurekaMonitors.GET_ALL_DELTA.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
       EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
    }

    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }

    Key cacheKey = new Key(Key.EntityType.Application,
           ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
               .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
}

 

 服务消费

1  负载均衡

示例代码中使用了@LoadBalanced注解后,LoadBalancerAutoConfiguration会在创建RestTemplate时为他加上LoadBalancerInterceptor拦截器。源代码如下:

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
      final LoadBalancerInterceptor loadBalancerInterceptor) {
   return new RestTemplateCustomizer() {
      @Override
      public void customize(RestTemplate restTemplate) {
        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
              restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
      }
   };
}

 

LoadBalancerInterceptor#intercept负责对请求进行拦截,这里拦截的核心罗是:通过负载均衡的方式调用服务提供者的服务,如下所示:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   return this.loadBalancer.execute(serviceName,
         new LoadBalancerRequest<ClientHttpResponse>() {

            @Override
            public ClientHttpResponse apply(final ServiceInstance instance)
                  throws Exception {
               HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                     instance);
               return execution.execute(serviceRequest, body);
            }
         });
}

 

通过RestTemplate请求时,经过拦截器,最终会通过LoadBalancerClient#execute执行业务逻辑。RibbonLoadBalancerClient#execute主要逻辑如下:

 - 首先选择一个LoadBalancer。

 - 然后通过LoadBalancer的规则选择服务提供者的服务器,默认情况LoadBalancer为ZoneAwareLoadBalancer。

 - 封装球球上下文,并发起http请求,返回结果

代码如下所示:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
   Server server = getServer(loadBalancer);
   if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
   }
   RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
         serviceId), serverIntrospector(serviceId).getMetadata(server));

   RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
   RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

   try {
      T returnVal = request.apply(ribbonServer);
     statsRecorder.recordStats(returnVal);
      return returnVal;
   }
   // catch IOException and rethrow so RestTemplate behaves correctly
   catch (IOException ex) {
      statsRecorder.recordStats(ex);
      throw ex;
   }
   catch (Exception ex) {
      statsRecorder.recordStats(ex);
      ReflectionUtils.rethrowRuntimeException(ex);
   }
   return null;
}

 

2  路由选择规则

负载均衡策略,很大程度上决定于路由策略。

1)   RandomRule

随机选择一个服务实例,具体来说是通过Random随机的选择服务器实例。

2)   RoundRobinRule

轮询服务器实例,选择服务实例,是默认采用的负载均衡策略。具体的实现逻辑是,定义一个counter,每选择一次提供者counter+1,通过「counter/总的实例数」确定这次选择第几个实例。

 

3)   RetryRule

轮询+重试的策略,首先会尝试通过轮询方式获取服务实例,如果获取的服务实例不可用那么尝试重新获取,重试有一个时间限制,如果超过了deadline(默认500ms)还是没取到,则会返回一个null。

 

4)   WeightedResponseTimeRule

会根据每一个实例的运行情况来给计算出该实例的一个权重,然后在挑选实例的时候则根据权重进行挑选,这样能够实现更优的实例调用。

WeightedResponseTimeRule中有一个名叫DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重,权重的计算规则也很简单,如果一个服务的平均响应时间越短则权重越大,那么该服务实例被选中执行任务的概率也就越大。

 

5)   ClientConfigEnabledRoundRobinRule

和RoundRobinRule策略一致。

 

6)   BestAvailableRule

根据loadBalancerStats中保存的服务实例的状态信息来过滤掉失效的服务实例的功能,然后顺便找出并发请求最小的服务实例来使用。如果loadBalancerStats为null,则采用轮询策略。

 

7)   PredicateBasedRule

通过内部一个过滤器过滤出一部分服务实例清单,然后采用轮询策略。

 

8)   ZoneAvoidanceRule

ZoneAvoidanceRule中的过滤条件是以ZoneAvoidancePredicate为主过滤条件和以AvailabilityPredicate为次过滤条件组成的一个叫做CompositePredicate的组合过滤条件,然后采用轮询策略。

 

 

 

上一篇:xen server 存储库(sr)损坏的数据恢复方案


下一篇:Merkle Patrcia Tree原理