InstanceRegistry#register
PeerAwareInstanceRegistryImpl#register
这里取得微服务过期时间 90s,服务之间心跳请求 30s 一次,如果 90s 还没发生,就说明挂了
这就是责任链模式!!!
AbstractInstanceRegistry#register 真正的注册
- 注册表
K:微服务集群 V:微服务集群内的各个节点
当发生注册信息冲突时,咋办?
根据最后活跃时间,确定覆盖哪个
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 并发读锁 read.lock(); // 先获得微服务名,然后获得实例 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); // 第一次注册时还不存在,所以 new 一个 if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); // 没有时才 put,有就不更新! 因为registry还是可能被写的!毕竟他不在读锁范围内! gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } // 已存在的注册节点 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { // 拿到已存在节点的注册时间 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // 当前正在注册的节点的注册时间 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. // 注册ing 时,有更加新的节点注册了! if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); // 那么就用已存在的替换当前注册节点,因为考虑到尽量保证可用性,因为既然还在活跃说明老的还能用 registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } // 新的心跳续约对象,包括注册信息,最后操作时间,注册事件,过期时间,剔除时间 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 将新对象放入注册表 gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { // 加入注册队列 recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
服务续约
通知Eureka Server Service Provider还活着,避免服务被剔除。
和register实现思路基本一致
- 更新自身状态
- 再同步到其它Peer
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { // 拿到具体的注册节点 leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { // 不存在,则重新注册续约 RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { //存在 // 先得到节点信息 InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); // 看是否处于宕机状态 if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); // 正常则续约 leaseToRenew.renew(); return true; } }
public enum InstanceStatus { UP, // Ready to receive traffic DOWN, // Do not send traffic- healthcheck callback failed STARTING, // Just about starting- initializations to be done - do not // send traffic OUT_OF_SERVICE, // Intentionally shutdown for traffic UNKNOWN; public static InstanceStatus toEnum(String s) { if (s != null) { try { return InstanceStatus.valueOf(s.toUpperCase()); } catch (IllegalArgumentException e) { // ignore and fall through to unknown logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN); } } return UNKNOWN; } }
续约,利用更新的持续时间,如果它是由相关联的指定T注册期间,否则默认持续时间是DEFAULT_DURATION_IN_SECS,即 90s