开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

全是干货的 Java 技术仓库:

  • 当你在启动类上添加了启动 Eureka 服务注册中心注解时,到底发生了什么呢?开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
  • 激活eureka服务器相关配置EurekaServerAutoConfiguration的注解开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
  • EurekaServerMarkerConfiguration开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
    点击到这里,我们发现 spring.factories文件

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

  • 注意到如下注解开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

@ConditionalOnBean - 条件注入

也就是当有EurekaServerMarkerConfiguration.Marker.class时,才会注入

所以@EnableEurekaServer就是个开关,只要写了该注解,spring 就会帮你把EurekaServerAutoConfiguration类注入进来。

那么为什么注入他就行了?

EurekaServerAutoConfiguration#jerseyFilterRegistration

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
注意如下类:

ApplicationResource#addInstance

相当于 MVC 中的 controller
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

   /**
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     *
     * @param info
     *            {@link InstanceInfo} information of the instance.
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     */
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

注意如下继承体系图
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

InstanceRegistry#register

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

PeerAwareInstanceRegistryImpl#register

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
这里取得微服务过期时间 90s,服务之间心跳请求 30s 一次,如果 90s 还没发生,就说明挂了
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

这就是责任链模式!!!

AbstractInstanceRegistry#register 真正的注册

  • 注册表
    K:微服务集群 V:微服务集群内的各个节点

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

当发生注册信息冲突时,咋办?

根据最后活跃时间,确定覆盖哪个

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 并发读锁
            read.lock();
            // 先获得微服务名,然后获得实例
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            // 第一次注册时还不存在,所以 new 一个
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                // 没有时才 put,有就不更新! 因为registry还是可能被写的!毕竟他不在读锁范围内!
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 已存在的注册节点
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 拿到已存在节点的注册时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 当前正在注册的节点的注册时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 注册ing 时,有更加新的节点注册了!
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // 那么就用已存在的替换当前注册节点,因为考虑到尽量保证可用性,因为既然还在活跃说明老的还能用
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 新的心跳续约对象,包括注册信息,最后操作时间,注册事件,过期时间,剔除时间
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 将新对象放入注册表
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                // 加入注册队列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

服务续约

通知Eureka Server Service Provider还活着,避免服务被剔除。

和register实现思路基本一致

  • 更新自身状态
  • 再同步到其它Peer
   public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 拿到具体的注册节点
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            // 不存在,则重新注册续约
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
        //存在
            // 先得到节点信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                // 看是否处于宕机状态
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            // 正常则续约
            leaseToRenew.renew();
            return true;
        }
    }
    public enum InstanceStatus {
        UP, // Ready to receive traffic
        DOWN, // Do not send traffic- healthcheck callback failed
        STARTING, // Just about starting- initializations to be done - do not
        // send traffic
        OUT_OF_SERVICE, // Intentionally shutdown for traffic
        UNKNOWN;

        public static InstanceStatus toEnum(String s) {
            if (s != null) {
                try {
                    return InstanceStatus.valueOf(s.toUpperCase());
                } catch (IllegalArgumentException e) {
                    // ignore and fall through to unknown
                    logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
                }
            }
            return UNKNOWN;
        }
    }

续约,利用更新的持续时间,如果它是由相关联的指定T注册期间,否则默认持续时间是DEFAULT_DURATION_IN_SECS,即 90s
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

服务失效剔除

AbstractInstanceRegistry#postInit

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

  • 剔除定时任务
    开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        // 判断是否打开自我保护机制
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        // 集合保存所有剔除节点
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 注册表大小,即所有注册节点个数
        int registrySize = (int) getLocalRegistrySize();
        // 阈值,0.85
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 直接相减,得到剔除阈值
        int evictionLimit = registrySize - registrySizeThreshold;

        // 将需要剔除节点个数 和 剔除阈值取最小值,作为自我保护机制下要剔除的节点个数。删多了还是 85 个开启,少了就降低点,这很合理!
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            // 随机剔除这么多个
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
  • 是否过期,该被剔除
    当前时间是否大于 最后操作时间+持续时间+服务集群之间同步的预留时间

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

自我保护机制

eureka 短时间内大量微服务被删除,会打开自我保护机制,避免自己宕机时疯狂删除

检查是否尤里卡服务器的自我保护功能。
当启用时,服务器跟踪它应该从服务器接收更新的数量。 任何时候,续期次数低于按规定的阈值百分比getRenewalPercentThreshold() ,服务器关闭到期,以避免身处险境将有助于服务器维护的客户端和服务器之间的网络出现问题时注册表的信息。
这些变化是在运行时有效
15min 85%宕机则打开该机制
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

同步给其他peer

传入各种动作
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

  • InstanceInfo的状态有以下几种
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

PeerAwareInstanceRegistryImpl#register

  • InstanceInfo写入到本地registry之后,然后同步给其他peer节点
    开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* 可选 */,
                                  InstanceStatus newStatus /* 可选 */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // 如果已经是副本,则不要再复制,因为这将创建有毒的复制!
            // 如果当前节点接收到的实例信息本就是另一个节点同步来的,则不会继续同步给其他节点,避免形成“广播效应”,造成死循环
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

我们看注册动作对应方法
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么
这里最终还是调用如下:
只不过此时关键在于是否为副本节点
开发SpringCloud微服务三年,我才知道@EnableEurekaServer注解到底做了什么

上一篇:[LeetCode] Remove Duplicates from Sorted List II 移除有序链表中的重复项之二


下一篇:SAP Leonardo图片处理相关的机器学习服务在SAP智能服务场景中的应用