SpringCloud之Ribbon

1、通过RestTemplate开启负载均衡

新增Configuration配置类,创建RestTemplate实例并通过@LoadBalanced开启负载均衡。

@Configuration
public class RestTemplateConfig {
    @Bean
    @LoadBalanced
    protected RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

@LoadBalanced注解的源码如下:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

从其注释中看到唯一有用的信息就是LoadBalancerClient,下面我么看看LoadBalancerClient的作用:

/**
 * 负载均衡器客户端
 *
 * Represents a client-side load balancer.
 *
 * @author Spencer Gibb
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {

	/**
	 * 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
	 *
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
	 * @param <T> type of the response
	 * @throws IOException in case of IO issues.
	 * @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
	 */
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
	 *
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param serviceInstance The service to execute the request to.
	 * @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
	 * @param <T> type of the response
	 * @throws IOException in case of IO issues.
	 * @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
	 */
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 重构URI
	 * 创建一个适当的URI,该URI具有供系统使用的真实主机和端口。
	 * 有些系统使用逻辑服务名称作为主机的URI,例如http://myservice/path/to/service。
	 * 重构之后会使用ServiceInstance(服务实例)中的host:port替换服务名。
	 *
	 * Creates a proper URI with a real host and port for systems to utilize.
	 * Some systems use a URI with the logical service name as the host, such as http://myservice/path/to/service.
	 * This will replace the service name with the host:port from the ServiceInstance.
	 * @param instance service instance to reconstruct the URI
	 * @param original A URI with the host as a logical service name.
	 * @return A reconstructed URI.
	 */
	URI reconstructURI(ServiceInstance instance, URI original);

}

LoadBalancerClient接口代表了负载均衡器客户端,并提供了execute、reconstructURI两个方法,其继承了ServiceInstanceChooser接口,该接口的代码如下:

/**
 * 服务实例选择器
 * Implemented by classes which use a load balancer to choose a server to send a request to.
 *
 * @author Ryan Baxter
 */
public interface ServiceInstanceChooser {

	/**
	 * 从LoadBalancer(负载均衡器)中为指定的服务选择一个ServiceInstance(服务实例)
	 * Chooses a ServiceInstance from the LoadBalancer for the specified service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @return A ServiceInstance that matches the serviceId.
	 */
	ServiceInstance choose(String serviceId);

}

ServiceInstanceChooser接口代表了服务器实例选择实例,并提供了一个choose方法来选择具体被调用的服务器实例。

通过这两个接口我们可以看到,LoadBalancerClient已经具有了选择服务器实例、执行、重构URI等功能;接下来我们就去看一下Ribbon的初始化过程。这里涉及到连个重要的配置类RibbonAutoConfiguration和LoadBalancerAutoConfiguration,下面我们针对这两个配置类的初始化做分析。

2、RibbonAutoConfiguration

Ribbon的初始化过程通过RibbonAutoConfiguration类引导完成。

@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
// EurekaClientAutoConfiguration --> EurekaClient自动配置类
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
// LoadBalancerAutoConfiguration --> 负载均衡 自动配置类
// AsyncLoadBalancerAutoConfiguration --> 负载均衡 异步请求 自动配置类
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,	AsyncLoadBalancerAutoConfiguration.class })
// RibbonEagerLoadProperties --> Ribbon热加载属性
// ServerIntrospectorProperties --> 服务器内省属性
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {

	@Autowired(required = false)
	private List<RibbonClientSpecification> configurations = new ArrayList<>();

	// Ribbon热加载属性配置
	@Autowired
	private RibbonEagerLoadProperties ribbonEagerLoadProperties;

	// HasFeatures 特性配置
	@Bean
	public HasFeatures ribbonFeature() {
		return HasFeatures.namedFeature("Ribbon", Ribbon.class);
	}

	// 创建客户端、负载均衡器和客户端配置实例的工厂
	@Bean
	public SpringClientFactory springClientFactory() {
		SpringClientFactory factory = new SpringClientFactory();
		factory.setConfigurations(this.configurations);
		return factory;
	}

	// 负载均衡器客户端
	@Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}

    // 重试
	@Bean
	@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
	@ConditionalOnMissingBean
	public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
		return new RibbonLoadBalancedRetryFactory(clientFactory);
	}

    // 提供了Ribbon需要的组件类名
	@Bean
	@ConditionalOnMissingBean
	public PropertiesFactory propertiesFactory() {
		return new PropertiesFactory();
	}

	// 开启热加载
	@Bean
	@ConditionalOnProperty("ribbon.eager-load.enabled")
	public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
		return new RibbonApplicationContextInitializer(springClientFactory(),
				ribbonEagerLoadProperties.getClients());
	}

	@Configuration
	@ConditionalOnClass(HttpRequest.class)
	@ConditionalOnRibbonRestClient
	protected static class RibbonClientHttpRequestFactoryConfiguration {

		@Autowired
		private SpringClientFactory springClientFactory;

		@Bean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
			return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
		}

		@Bean
		public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
			return new RibbonClientHttpRequestFactory(this.springClientFactory);
		}

	}

	// TODO: support for autoconfiguring restemplate to use apache http client or okhttp
	@Target({ ElementType.TYPE, ElementType.METHOD })
	@Retention(RetentionPolicy.RUNTIME)
	@Documented
	@Conditional(OnRibbonRestClientCondition.class)
	@interface ConditionalOnRibbonRestClient {

	}

	private static class OnRibbonRestClientCondition extends AnyNestedCondition {

		OnRibbonRestClientCondition() {
			super(ConfigurationPhase.REGISTER_BEAN);
		}

		@Deprecated // remove in Edgware"
		@ConditionalOnProperty("ribbon.http.client.enabled")
		static class ZuulProperty {

		}

		@ConditionalOnProperty("ribbon.restclient.enabled")
		static class RibbonProperty {

		}

	}

	/**
	 * {@link AllNestedConditions} that checks that either multiple classes are present.
	 */
	static class RibbonClassesConditions extends AllNestedConditions {

		RibbonClassesConditions() {
			super(ConfigurationPhase.PARSE_CONFIGURATION);
		}

		@ConditionalOnClass(IClient.class)
		static class IClientPresent {

		}

		@ConditionalOnClass(RestTemplate.class)
		static class RestTemplatePresent {

		}

		@ConditionalOnClass(AsyncRestTemplate.class)
		static class AsyncRestTemplatePresent {

		}

		@ConditionalOnClass(Ribbon.class)
		static class RibbonPresent {

		}

	}

}

这里初始化的比较重要的组件有:

  • SpringClientFactory:创建客户端、负载均衡器和客户端配置实例的工厂,它为每个客户机名创建一个Spring ApplicationContext,并从中提取所需的bean。
  • LoadBalancerClient:负载均衡器客户端
  • LoadBalancedRetryFactory:SpringCloud重试功能定制工厂类

关于这几个组件具体使用的时间节点,会在后面通过一次负载均衡的Http请求来讲解。

3、LoadBalancerAutoConfiguration

LoadBalancerAutoConfiguration是Ribbon负载均衡自动配置类,其代码如下。

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}

	// 负载均衡请求工厂类
	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}

	// Auto configuration for retry mechanism.
	// 重试机制配置类
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryAutoConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public LoadBalancedRetryFactory loadBalancedRetryFactory() {
			return new LoadBalancedRetryFactory() {
			};
		}

	}

	// 重试拦截机制配置类
	// Auto configuration for retry intercepting mechanism.
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryInterceptorAutoConfiguration {

		@Bean
		@ConditionalOnMissingBean
		public RetryLoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRetryProperties properties,
				LoadBalancerRequestFactory requestFactory,
				LoadBalancedRetryFactory loadBalancedRetryFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
					requestFactory, loadBalancedRetryFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

	// 忽略
	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {

		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}

}

这里初始化的比较重要的组件有:

  • LoadBalancerRequestFactory:为LoadBalancerInterceptor和RetryLoadBalancerInterceptor创建LoadBalancerRequest,并将LoadBalancerRequestTransformer应用于拦截的HttpRequest
  • LoadBalancedRetryFactory:如果容器中没有LoadBalancedRetryFactory实例的话,这里会创建一个空的LoadBalancedRetryFactory实例
  • RestTemplateCustomizer:定制RestTemplate,加入RetryLoadBalancerInterceptor拦截器,通过RetryLoadBalancerInterceptor拦截Http请求,并调用choose方法选择服务器,实现负载均衡

关于这几个组件具体使用的时间节点,下面我们通过一次负载均衡的Http请求来讲解。

4、负载均衡的Http请求调用过程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w8Mrm0kB-1631330467682)(media/15604189016881/15638776502118.jpg)]

访问UserController的一个get方法,其调用时序图如上。其中关于RestTemplate的调用过程我们不多赘述,其最终会调用到RetryLoadBalancerInterceptor的intercept方法。接下来我们从这里开始分析:

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    // 获取URL和serviceName
    final URI originalUri = request.getURI();
    final String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 获取负载均衡重试策略并创建重试模板
    final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
    // 创建重试策略模板,并为模板设置重试策略;
    // 未开启重试机制使用NeverRetryPolicy策略;
    // 若开启重试机制使用InterceptorRetryPolicy策略
    RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
    // 执行请求
    return template.execute(context -> {
        ServiceInstance serviceInstance = null;
        // 获取serviceInstance
        if (context instanceof LoadBalancedRetryContext) {
            LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
            serviceInstance = lbContext.getServiceInstance();
        }
        // 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
        if (serviceInstance == null) {
            serviceInstance = this.loadBalancer.choose(serviceName);
        }
        // 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
        ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
                .execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
        int statusCode = response.getRawStatusCode();
        if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
            byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
            response.close();
            throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
        }
        return response;
    }, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
        // This is a special case, where both parameters to
        // LoadBalancedRecoveryCallback are
        // the same. In most cases they would be different.
        @Override
        protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
            return response;
        }
    });
}

这里执行的较为重要的步骤有:

  • 获取负载均衡重试策略并创建重试模板
  • 创建重试策略模板,并为模板设置重试策略
  • 执行请求
4.1 获取负载均衡重试策略并创建重试模板
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
	RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
	return new RibbonLoadBalancedRetryPolicy(service, lbContext,
			serviceInstanceChooser, clientFactory.getClientConfig(service));
}

// 构造函数
public RibbonLoadBalancedRetryPolicy(String serviceId,RibbonLoadBalancerContext context,
		ServiceInstanceChooser loadBalanceChooser,IClientConfig clientConfig) {
	this.serviceId = serviceId;
	this.lbContext = context;
	this.loadBalanceChooser = loadBalanceChooser;
	// 获取并设置重试的http状态码
	String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
	String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
	for (String code : retryableStatusCodesArray) {
		if (!StringUtils.isEmpty(code)) {
			try {
				retryableStatusCodes.add(Integer.valueOf(code.trim()));
			}
			catch (NumberFormatException e) {
				log.warn("We cant add the status code because the code [ " + code
						+ " ] could not be converted to an integer. ", e);
			}
		}
	}
}
4.2 创建重试策略模板,并为模板设置重试策略
private RetryTemplate createRetryTemplate(String serviceName,HttpRequest request,LoadBalancedRetryPolicy retryPolicy){
	RetryTemplate template = new RetryTemplate();
	// 创建并设置BackOffPolicy
	BackOffPolicy backOffPolicy = this.lbRetryFactory.createBackOffPolicy(serviceName);
	template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
	template.setThrowLastExceptionOnExhausted(true);
	// 设置RetryListener
	RetryListener[] retryListeners = this.lbRetryFactory.createRetryListeners(serviceName);
	if (retryListeners != null && retryListeners.length != 0) {
		template.setListeners(retryListeners);
	}
	// 重点:
    // 如果配置了spring.cloud.loadbalancer.retry=false;或者retryPolicy==null,则使用NeverRetryPolicy
    // 否则使用InterceptorRetryPolicy
	template.setRetryPolicy(!this.lbProperties.isEnabled() || retryPolicy == null
			? new NeverRetryPolicy():new InterceptorRetryPolicy(request,retryPolicy,this.loadBalancer,serviceName));
	return template;
}

该方法中只要关心NeverRetryPolicy和InterceptorRetryPolicy即可,其调用方式我们在后面继续分析。

4.3 执行请求

前面提到了NeverRetryPolicy和InterceptorRetryPolicy,那么下面我们针对两种不同的策略,来看其具体的调用过程:首先在配置文件中禁用重试(此属性默认开启)

#允许重试
spring.cloud.loadbalancer.retry.enabled=false

调用RetryTemplate的execute方法,在execute方法中调用doExecute,在doExecute方法中有:

while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
    // 省略部分代码
}

这里canRetry会调用当前使用的重试策略中的方法,因为我们未开启请求重试,则使用的是NeverRetryPolicy,这里要注意:

public boolean canRetry(RetryContext context) {
	return !((NeverRetryContext) context).isFinished();
}

该方法会在在第一个异常之后返回false。其默认总是有一个尝试,然后防止重试。所以这里即使配置了禁用重试,这里默认也会返回true。那么如果我们开启了重试策略,则使用InterceptorRetryPolicy,我们来看一下其canRetry方法:

@Override
public boolean canRetry(RetryContext context) {
	LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
	if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
		// We haven't even tried to make the request yet so return true so we do
        // 为LoadBalancedRetryContext设置ServiceInstance
		lbContext.setServiceInstance(this.serviceInstanceChooser.choose(this.serviceName));
		return true;
	}
	return this.policy.canRetryNextServer(lbContext);
}

有一点需要注意,这里已经为LoadBalancedRetryContext设置了ServiceInstance,关于其choose方法,我们会在后面讲解。

回到RetryLoadBalancerInterceptor的execute方法,我们看一下开启重试、关闭重试这两种策略是如何获取调用服务器的。

// 获取serviceInstance,如果为禁用重试策略,则这里是true
if (context instanceof LoadBalancedRetryContext) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
    serviceInstance = lbContext.getServiceInstance();
}
// 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
if (serviceInstance == null) {
    serviceInstance = this.loadBalancer.choose(serviceName);
}

对于开启重试策略,因为已经在canRetry中设置了serviceInstance,所以直接获取即可;对于关闭重试,则需要通过choose方法来获取serviceInstance,关于choose过程,会在后面的负载均衡策略方式中分析。那有了这些准备工作,接下来就可以正式对serviceInstance发起请求了。其具体的调用依然是使用ClientHttpRequest来进行,不多赘述,感兴趣的同学可以自己查看其代码。

5、Ribbon负载均衡策略

前文分析到了一个choose方法,该方法即负载均衡的入口。

// RibbonLoadBalancerClient类的choose、getServer方法
@Override
public ServiceInstance choose(String serviceId) {
	return choose(serviceId, null);
}

public ServiceInstance choose(String serviceId, Object hint) {
    // 获取被调用的服务器
	Server server = getServer(getLoadBalancer(serviceId), hint);
	// 服务器为空返回null;否则返回RibbonServer
	if (server == null) {
		return null;
	}
	return new RibbonServer(serviceId, server, isSecure(server, serviceId),
			serverIntrospector(serviceId).getMetadata(server));
}

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
	if (loadBalancer == null) {
		return null;
	}
	// Use 'default' on a null hint, or just pass it on?
	return loadBalancer.chooseServer(hint != null ? hint : "default");
}


// BaseLoadBalancer类的chooseServer方法
// 根据指定的key获取一个可用的服务器
public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    // 无可用负载均衡策略,返回null
    if (rule == null) {
        return null;
    } else {
        try {
            // 根据负载均衡策略选择服务器
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

从上面的方法调用过程中可以看到,rule.choose(key)是最终选择服务器的方法。rule是IRule接口的实现类。IRule接口为LoadBalancer定义“规则”的接口。规则可以看作是负载平衡的策略。众所周知的负载平衡策略包括轮询、基于响应时间的策略等。最常见的负载均衡策略有轮询、随机、权重等,接下来分析一下这三种最常见的策略。

5.1 ZoneAvoidanceRule 过滤轮询策略

该类继承了PredicateBasedRule,其属于轮询策略,也是SpringCloud的默认负载均衡策略,该方法会先筛选出所有可用的服务器列表,然后使用轮询算法得到被调用的服务器。

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
    // 获取被调用的服务器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

从代码中看,该方法的核心是chooseRoundRobinAfterFiltering,其代码如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    // 获取所有可用的服务器列表
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    // 使用轮询策略从可用服务器列表中选出被调用服务器
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

获取被调用服务器的过程分为三步:

  • 获取所有的服务器列表,包括可用和不可用
  • 过滤并得到可用的候选服务器列表,即从中筛选出可用的服务器列表
  • 使用轮询策略从候选服务器列表中选择一个被调用的服务器
5.1.1 获取候选服务器列表
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

该方法的核心是apply方法,最终会调用ZoneAvoidancePredicate类下的apply方法:

public boolean apply(@Nullable PredicateKey input) {
    // 如niws.loadbalancer.zoneAvoidanceRule.enabled=false,
    // 则不开启server zone,无需过滤
    if (!ENABLED.get()) {
        return true;
    }
    // 如server zone名称为null,无需过滤
    String serverZone = input.getServer().getZone();
    if (serverZone == null) {
        // there is no zone information from the server, we do not want to filter out this server
        return true;
    }
    // 如无可用的LoadBalancerStats,无需过滤
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    // 如可用的server zone数量<=1,无需过滤
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    // 如负载均衡中未包含当前获取到的分片zone,无需过滤
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) {
        // The server zone is unknown to the load balancer, do not filter it out
        return true;
    }
    // 获取可用的server zone
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot,
            triggeringLoad.get(),
            triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    // 再次判断获取到的可用server zone中是否包含要过滤的server zone
    if (availableZones != null) {
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
}
5.1.2 使用轮询策略选出被调用服务器
private final AtomicInteger nextIndex = new AtomicInteger();
private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

轮询的算法还是很简单的,不多赘述。

5.2 开启自定义负载均衡策略

如果想要开启自定义负载均衡策略,可以有两种方式:
1、使用Ribbon中已存在的负载均衡策略,如轮询

/**
 * RoundRobinRule 轮询负载均衡策略
 */
@Configuration
public class RoundRobinRuleConfig {
    @Bean
    public IRule roundRobinRule() {
        return new RoundRobinRule();
    }
}

2、实现IRule接口并将其作为配置类

/**
 * 自定义负载均衡策略
 */
@Configuration
public class MyRule implements IRule {
    @Override
    public Server choose(Object key) {
        // do something
        return null;
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        // do something
    }

    @Override
    public ILoadBalancer getLoadBalancer() {
        // do something
        return null;
    }

    @Bean
    public IRule myRule() {
        return new MyRule();
    }
}
5.3 RoundRobinRule 轮询策略
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        // 可用服务器和所有服务器信息
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        // 无可用服务器
        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        // 获取下一次请求调用的服务器
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        // 判断获取到的服务器状态,若不可用,继续循环;如可用,则返回
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    // 若尝试获取服务器次数大于等于10,则直接返回;并记录警告日志
    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: " + lb);
    }
    return server;
}

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}
5.4 RandomRule 随机策略
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    }
    Server server = null;

    while (server == null) {
        if (Thread.interrupted()) {
            return null;
        }

        // 可用服务
        List<Server> upList = lb.getReachableServers();
        // 所有服务
        List<Server> allList = lb.getAllServers();
        // 所有服务数
        int serverCount = allList.size();
        // 无服务
        if (serverCount == 0) {
            /*
             * No servers. End regardless of pass, because subsequent passes only get more restrictive.
             */
            return null;
        }

        // 随机获取一个server
        int index = chooseRandomInt(serverCount);
        server = upList.get(index);

        // 判断server可用性,返回server或继续重试
        if (server == null) {
            /*
             * The only time this should happen is if the server list were somehow trimmed.
             * This is a transient condition. Retry after yielding.
             */
            Thread.yield();
            continue;
        }

        if (server.isAlive()) {
            return (server);
        }

        // Shouldn't actually happen.. but must be transient or a bug.
        server = null;
        Thread.yield();
    }

    return server;

}

// 获取一个[0~serverCount)随机数,使用ThreadLocalRandom减少竞争,提高并发效率
protected int chooseRandomInt(int serverCount) {
    return ThreadLocalRandom.current().nextInt(serverCount);
}
5.4 WeightedResponseTimeRule 权重策略

使用平均响应时间为每个服务器分配动态“权重”的规则,然后以“加权循环”方式使用该规则。相对于轮询、随机,该策略稍微复杂一些。

5.4.1 维护权重

该方法首先会开启定时任务来维护权重,即服务器的平均响应时间。

void initialize(ILoadBalancer lb) {
    if (serverWeightTimer != null) {
        serverWeightTimer.cancel();
    }
    serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);
    serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);
    // do a initial run
    ServerWeight sw = new ServerWeight();
    sw.maintainWeights();
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        public void run() {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
            serverWeightTimer.cancel();
        }
    }));
}

class DynamicServerWeightTask extends TimerTask {
    public void run() {
        ServerWeight serverWeight = new ServerWeight();
        try {
            serverWeight.maintainWeights();
        } catch (Exception e) {
            logger.error("Error running DynamicServerWeightTask for {}", name, e);
        }
    }
}

class ServerWeight {

    // 权重维护
    public void maintainWeights() {
        ILoadBalancer lb = getLoadBalancer();
        if (lb == null) {
            return;
        }

        if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
            return;
        }

        try {
            // 开始调整权重
            logger.info("Weight adjusting job started ...");

            // 获取负载均衡数据信息统计
            AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
            LoadBalancerStats stats = nlb.getLoadBalancerStats();
            if (stats == null) {
                // no statistics, nothing to do
                return;
            }

            // 计算所有服务器平均响应时间之和
            double totalResponseTime = 0;
            // find maximal 95% response time
            for (Server server : nlb.getAllServers()) {
                // this will automatically load the stats if not in cache
                ServerStats ss = stats.getSingleServerStat(server);
                totalResponseTime += ss.getResponseTimeAvg();
            }

            // weight for each server is (sum of responseTime of all servers - responseTime)
            // so that the longer the response time, the less the weight and the less likely to be chosen
            // 每个服务器的权重为(所有服务器的响应时间之和 - 平均响应时间)
            // 因此,响应时间越长,权重越小,被选择的可能性就越小
            // 服务器权重之和
            Double weightSoFar = 0.0;

            // create new list and hot swap the reference
            List<Double> finalWeights = new ArrayList<Double>();
            for (Server server : nlb.getAllServers()) {
                ServerStats ss = stats.getSingleServerStat(server);
                // 单独服务器权重 = 所有服务器的响应时间之和 - 平均响应时间
                double weight = totalResponseTime - ss.getResponseTimeAvg();
                // 累加服务器权重之和
                weightSoFar += weight;
                // 添加累加服务器权重之和到累计权重列表之中
                finalWeights.add(weightSoFar);
            }
            setWeights(finalWeights);
        } catch (Exception e) {
            logger.error("Error calculating server weights", e);
        } finally {
            serverWeightAssignmentInProgress.set(false);
        }

    }
}
5.4.2 选择服务器

然后通过choose方法根据权重来挑选服务器,即平均响应时间越短越有可能被选中

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    }
    Server server = null;

    while (server == null) {
        // get hold of the current reference in case it is changed from the other thread
        // 获取当前引用,以防它被其他线程更改
        List<Double> currentWeights = accumulatedWeights;
        if (Thread.interrupted()) {
            return null;
        }

        // 所有服务器信息
        List<Server> allList = lb.getAllServers();
        int serverCount = allList.size();
        if (serverCount == 0) {
            return null;
        }

        int serverIndex = 0;
        // last one in the list is the sum of all weights
        // 最大权重;若权重累积集合为空,则取0;否则取权重累积集合中最后一个元素
        double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);

        // No server has been hit yet and total weight is not initialized,fallback to use round robin
        // 若若权重累积尚未被初始化,或者统计的服务数量与总服务器数量不等,则使用轮询负载均衡策略
        if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
            server = super.choose(getLoadBalancer(), key);
            if(server == null) {
                return server;
            }
        } else {
            // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
            // 随机生成一个在0(包含)到 最大权重(不包含)之间的随机数
            double randomWeight = random.nextDouble() * maxTotalWeight;
            // pick the server index based on the randomIndex
            // 根据生成的随机数在累计权重列表中获取下标,并根据下标获取对应的服务器
            int n = 0;
            for (Double d : currentWeights) {
                if (d >= randomWeight) {
                    serverIndex = n;
                    break;
                } else {
                    n++;
                }
            }
            server = allList.get(serverIndex);
        }

        // 判断server可用性,返回server或继续重试
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive()) {
            return (server);
        }

        // Next.
        server = null;
    }
    return server;
}

6、Ribbon 重试机制

前面已分析了Ribbon通过RestTemplate开启负载均衡,并分析了完整的一次Http调用过程,那如果在调用过程中发生错误,我们想重试调用的话,可以使用Ribbon的重试机制。前文分析到在RetryLoadBalancerInterceptor类的intercept方法中有如下代码:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
        final ClientHttpRequestExecution execution) throws IOException {
    // 获取URL和serviceName
    final URI originalUri = request.getURI();
    final String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 获取负载均衡重试策略并创建重试模板
    final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
    // 创建重试策略模板,并为模板设置重试策略;
    // 未开启重试机制使用NeverRetryPolicy策略;
    // 若开启重试机制使用InterceptorRetryPolicy策略
    RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
    // 执行请求
    return template.execute(context -> {
        ServiceInstance serviceInstance = null;
        // 获取serviceInstance,如果为禁用重试策略,则这里是true
        if (context instanceof LoadBalancedRetryContext) {
            LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
            serviceInstance = lbContext.getServiceInstance();
        }
        // 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
        if (serviceInstance == null) {
            serviceInstance = this.loadBalancer.choose(serviceName);
        }
        // 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
        ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
                .execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
        int statusCode = response.getRawStatusCode();
        // 重试策略不为null 且 重试策略指定的重试http状态码包含当前请求返回的状态码
        if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
            byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
            response.close();
            throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
        }
        return response;
    }, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
        // This is a special case, where both parameters to LoadBalancedRecoveryCallback are the same.
        // In most cases they would be different.
        @Override
        protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
            return response;
        }
    });
}

如果重试策略不为null,并且重试策略指定的重试http状态码包含当前请求返回的状态码,那么Ribbon会重试该Http请求。其中获取重试策略通过this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer)来完成,我们来看其具体的获取过程。

/**
 * 创建负载均衡的重试策略模板
 */
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
	RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
	return new RibbonLoadBalancedRetryPolicy(service, lbContext,
			serviceInstanceChooser, clientFactory.getClientConfig(service));
}

public RibbonLoadBalancedRetryPolicy(String serviceId,
                                     RibbonLoadBalancerContext context,
                                     ServiceInstanceChooser loadBalanceChooser,
                                     IClientConfig clientConfig) {
    this.serviceId = serviceId;
    this.lbContext = context;
    this.loadBalanceChooser = loadBalanceChooser;
    // 获取并设置重试的http状态码
    // 通过clientName.ribbon.retryableStatusCodes设置
    String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
    String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
    for (String code : retryableStatusCodesArray) {
        if (!StringUtils.isEmpty(code)) {
            try {
                retryableStatusCodes.add(Integer.valueOf(code.trim()));
            } catch (NumberFormatException e) {
                log.warn("We cant add the status code because the code [ " + code
                        + " ] could not be converted to an integer. ", e);
            }
        }
    }
}

RibbonLoadBalancedRetryPolicy策略实例化的过程很简单,除了将必要的变量信息赋值以外,还解析了重试http状态码。http状态码通过clientName.ribbon.retryableStatusCodes指定,而且从源码中我们已经可以看到多个状态码之间用英文逗号分隔,而且必须是整形数字。那么到这里重试策略已经获取完成了。接下来再看看Ribbon的常用重试配置,其具体的作用见注释:

# 允许重试,该属性默认为true
spring.cloud.loadbalancer.retry.enabled=true
# 同一个Server最大的重试次数,该属性值默认为0
spring-cloud-provider.ribbon.MaxAutoRetries=5
# 最大重试Server的个数,该属性默认值为1
spring-cloud-provider.ribbon.MaxAutoRetriesNextServer=5
# 是否开启任何异常都重试(默认在get请求下会重试,其他情况不会重试,除非设置为true)
spring-cloud-provider.ribbon.OkToRetryOnAllOperations=false
# 指定重试的http状态码
spring-cloud-provider.ribbon.retryableStatusCodes=404,401,500
6.1、重试策略触发及流程

RetryLoadBalancerInterceptor的intercept方法,最终会通过RetryTemplate进行调用,其调用方法如下:

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

    RetryPolicy retryPolicy = this.retryPolicy;
    BackOffPolicy backOffPolicy = this.backOffPolicy;

    // Allow the retry policy to initialise itself...
    // 允许重试策略自我初始化
    RetryContext context = open(retryPolicy, state);
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("RetryContext retrieved: " + context);
    }

    // Make sure the context is available globally for clients who need it...
    // 注册重试上下文
    RetrySynchronizationManager.register(context);

    Throwable lastException = null;

    boolean exhausted = false;
    try {

        // Give clients a chance to enhance the context...
        // 给客户一个增强上下文的机会
        // 若RetryListener的open方法返回false,整个重试将被终止,并抛出异常
        boolean running = doOpenInterceptors(retryCallback, context);

        if (!running) {
            // 在第一次尝试前拦截器异常终止重试
            throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
        }

        // Get or Start the backoff context...
        BackOffContext backOffContext = null;
        Object resource = context.getAttribute("backOffContext");

        if (resource instanceof BackOffContext) {
            backOffContext = (BackOffContext) resource;
        }

        if (backOffContext == null) {
            backOffContext = backOffPolicy.start(context);
            if (backOffContext != null) {
                context.setAttribute("backOffContext", backOffContext);
            }
        }

        /*
         * We allow the whole loop to be skipped if the policy or context already forbid the first try.
         * This is used in the case of external retry to allow a
         * recovery in handleRetryExhausted without the callback processing (which
         * would throw an exception).
         *
         * canRetry -->  决定是否继续进行正在进行的重试尝试。
         *               此方法在执行RetryCallback之前调用,但在执行backoff和open拦截器之后调用。
         *
         * context.isExhaustedOnly() --> RetryContext已经设置了不再尝试或重试当前的RetryCallback
         *
         */
        while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
            try {
                this.logger.info("...Retry: count=" + context.getRetryCount());
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Retry: count=" + context.getRetryCount());
                }
                // Reset the last exception,
                // so if we are successful the close interceptors will not think we failed...
                // 重置(清空)最后的一次异常
                lastException = null;
                return retryCallback.doWithRetry(context);
            }
            catch (Throwable e) {
                lastException = e;
                try {
                    // 注册异常信息,并记录当前重试次数
                    registerThrowable(retryPolicy, state, context, e);
                }
                catch (Exception ex) {
                    throw new TerminatedRetryException("Could not register throwable",ex);
                }
                finally {
                    doOnErrorInterceptors(retryCallback, context, e);
                }

                if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                    try {
                        backOffPolicy.backOff(backOffContext);
                    }
                    catch (BackOffInterruptedException ex) {
                        lastException = e;
                        // back off was prevented by another thread - fail the retry
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Abort retry because interrupted: count="	+ context.getRetryCount());
                        }
                        throw ex;
                    }
                }

                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
                }

                if (shouldRethrow(retryPolicy, context, state)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Rethrow in retry for policy: count="+ context.getRetryCount());
                    }
                    throw RetryTemplate.<E>wrapIfNecessary(e);
                }
            }
            /*
             * A stateful attempt that can retry may rethrow the exception before now,
             * but if we get this far in a stateful retry there's a reason for it,
             * like a circuit breaker or a rollback classifier.
             *
             * 可以重试的有状态尝试可能会在现在之前重新抛出异常,
             * 但是如果我们在有状态重试中走到这一步,那么这是有原因的,
             * 比如断路器或回滚分类器。
             */
            if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                break;
            }
        }

        if (state == null && this.logger.isDebugEnabled()) {
            this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
        }
        exhausted = true;
        return handleRetryExhausted(recoveryCallback, context, state);
    }
    catch (Throwable e) {
        throw RetryTemplate.<E>wrapIfNecessary(e);
    }
    finally {
        close(retryPolicy, context, state, lastException == null || exhausted);
        doCloseInterceptors(retryCallback, context, lastException);
        RetrySynchronizationManager.clear();
    }
}

该段代码较长,其中重点有两部分:
第一:while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) 判断是否可继续重试,这里包含了两个条件。
第一个条件,canRetry 是否可重试,与具体的重试策略有关,我们来看下RibbonLoadBalancedRetryPolicy策略是如何判断可重试的:

public boolean canRetry(LoadBalancedRetryContext context) {
    HttpMethod method = context.getRequest().getMethod();
    return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
}

如果当前请求类型是GET或者设置了spring-cloud-provider.ribbon.OkToRetryOnAllOperations=true,那么该请求满足canRetry条件。
第二个条件,RetryContext上下文已经设置了不再尝试或重试当前的RetryCallback,例如重试次数已经达到我们设置的重试次数之后,Ribbon会通过setExhaustedOnly设置该属性为true,不再进行请求重试。

第二:registerThrowable(retryPolicy, state, context, e) 注册异常信息,在RetryLoadBalancerInterceptor类的intercept方法中已经介绍过,当被调用server返回的状态码中包含我们通过spring-cloud-provider.ribbon.retryableStatusCodes配置的状态码时会抛出ClientHttpResponseStatusCodeException,该异常会被捕获到并通过registerThrowable方法注册异常信息。

@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
    // if this is a circuit tripping exception then notify the load balancer
    // 如果该异常是断路异常,则通知负载均衡器
    if (lbContext.getRetryHandler().isCircuitTrippingException(throwable)) {
        updateServerInstanceStats(context);
    }
    // Check if we need to ask the load balancer for a new server.
    // Do this before we increment the counters because the first call to this method
    // is not a retry it is just an initial failure.
    // 检查是否需要向负载平衡器请求新服务器。
    // 在增加计数器之前执行此操作,因为对该方法的第一次调用不是重试,而是初始失败。
    if (!canRetrySameServer(context) && canRetryNextServer(context)) {
        context.setServiceInstance(loadBalanceChooser.choose(serviceId));
    }
    // This method is called regardless of whether we are retrying or making the first request.
    // Since we do not count the initial request in the retry count we don't reset the counter
    // until we actually equal the same server count limit. This will allow us to make the initial
    // request plus the right number of retries.
    // 无论我们是重试还是发出第一个请求,都会调用此方法。
    // 由于在重试计数中不计算初始请求,所以在实际等于相同的服务器计数限制之前不会重置计数器。
    // 这将允许我们发出初始请求和正确的重试次数。
    /**
     * 实例变量 sameServerCount 和 nextServerCount 默认都是0
     *
     * 这里假如我们开启了重试机制,那么MaxAutoRetries和MaxAutoRetriesNextServer(默认值为1)配置将决定重试次数
     *
     * 若重试包含首次调用的话,那么重试次数 =(MaxAutoRetries+1)*(MaxAutoRetriesNextServer+1)
     */
    if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
        // reset same server since we are moving to a new server
        sameServerCount = 0;
        nextServerCount++;
        // 若不满足在其他服务器实例重试的条件,则将终止重试标识设置为true
        if (!canRetryNextServer(context)) {
            context.setExhaustedOnly();
        }
    } else {
        sameServerCount++;
    }
}

该段代码最重要的就是最后的ifelse判断,在分析这段代码之前,先来看一下canRetryNextServer方法:

@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
    return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
}

canRetryNextServer判断条件很简单,满足canRetry条件,前文已有介绍,nextServerCount计数器小于等于spring-cloud-provider.ribbon.MaxAutoRetriesNextServer。了解了canRetryNextServer之后,再来看ifelse判断,

上一篇:创建ribbon(模仿就能跑,超详细)


下一篇:Ribbon是什么?