1、通过RestTemplate开启负载均衡
新增Configuration配置类,创建RestTemplate实例并通过@LoadBalanced开启负载均衡。
@Configuration
public class RestTemplateConfig {
@Bean
@LoadBalanced
protected RestTemplate restTemplate() {
return new RestTemplate();
}
}
@LoadBalanced注解的源码如下:
/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
* @author Spencer Gibb
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
从其注释中看到唯一有用的信息就是LoadBalancerClient,下面我么看看LoadBalancerClient的作用:
/**
* 负载均衡器客户端
*
* Represents a client-side load balancer.
*
* @author Spencer Gibb
*/
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
*
* Executes request using a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
*
* Executes request using a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @param serviceInstance The service to execute the request to.
* @param request Allows implementations to execute pre and post actions, such as incrementing metrics.
* @param <T> type of the response
* @throws IOException in case of IO issues.
* @return The result of the LoadBalancerRequest callback on the selected ServiceInstance.
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* 重构URI
* 创建一个适当的URI,该URI具有供系统使用的真实主机和端口。
* 有些系统使用逻辑服务名称作为主机的URI,例如http://myservice/path/to/service。
* 重构之后会使用ServiceInstance(服务实例)中的host:port替换服务名。
*
* Creates a proper URI with a real host and port for systems to utilize.
* Some systems use a URI with the logical service name as the host, such as http://myservice/path/to/service.
* This will replace the service name with the host:port from the ServiceInstance.
* @param instance service instance to reconstruct the URI
* @param original A URI with the host as a logical service name.
* @return A reconstructed URI.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
LoadBalancerClient接口代表了负载均衡器客户端,并提供了execute、reconstructURI两个方法,其继承了ServiceInstanceChooser接口,该接口的代码如下:
/**
* 服务实例选择器
* Implemented by classes which use a load balancer to choose a server to send a request to.
*
* @author Ryan Baxter
*/
public interface ServiceInstanceChooser {
/**
* 从LoadBalancer(负载均衡器)中为指定的服务选择一个ServiceInstance(服务实例)
* Chooses a ServiceInstance from the LoadBalancer for the specified service.
* @param serviceId The service ID to look up the LoadBalancer.
* @return A ServiceInstance that matches the serviceId.
*/
ServiceInstance choose(String serviceId);
}
ServiceInstanceChooser接口代表了服务器实例选择实例,并提供了一个choose方法来选择具体被调用的服务器实例。
通过这两个接口我们可以看到,LoadBalancerClient已经具有了选择服务器实例、执行、重构URI等功能;接下来我们就去看一下Ribbon的初始化过程。这里涉及到连个重要的配置类RibbonAutoConfiguration和LoadBalancerAutoConfiguration,下面我们针对这两个配置类的初始化做分析。
2、RibbonAutoConfiguration
Ribbon的初始化过程通过RibbonAutoConfiguration类引导完成。
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
// EurekaClientAutoConfiguration --> EurekaClient自动配置类
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
// LoadBalancerAutoConfiguration --> 负载均衡 自动配置类
// AsyncLoadBalancerAutoConfiguration --> 负载均衡 异步请求 自动配置类
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class })
// RibbonEagerLoadProperties --> Ribbon热加载属性
// ServerIntrospectorProperties --> 服务器内省属性
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
@Autowired(required = false)
private List<RibbonClientSpecification> configurations = new ArrayList<>();
// Ribbon热加载属性配置
@Autowired
private RibbonEagerLoadProperties ribbonEagerLoadProperties;
// HasFeatures 特性配置
@Bean
public HasFeatures ribbonFeature() {
return HasFeatures.namedFeature("Ribbon", Ribbon.class);
}
// 创建客户端、负载均衡器和客户端配置实例的工厂
@Bean
public SpringClientFactory springClientFactory() {
SpringClientFactory factory = new SpringClientFactory();
factory.setConfigurations(this.configurations);
return factory;
}
// 负载均衡器客户端
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
// 重试
@Bean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
return new RibbonLoadBalancedRetryFactory(clientFactory);
}
// 提供了Ribbon需要的组件类名
@Bean
@ConditionalOnMissingBean
public PropertiesFactory propertiesFactory() {
return new PropertiesFactory();
}
// 开启热加载
@Bean
@ConditionalOnProperty("ribbon.eager-load.enabled")
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(springClientFactory(),
ribbonEagerLoadProperties.getClients());
}
@Configuration
@ConditionalOnClass(HttpRequest.class)
@ConditionalOnRibbonRestClient
protected static class RibbonClientHttpRequestFactoryConfiguration {
@Autowired
private SpringClientFactory springClientFactory;
@Bean
public RestTemplateCustomizer restTemplateCustomizer(
final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
}
@Bean
public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
return new RibbonClientHttpRequestFactory(this.springClientFactory);
}
}
// TODO: support for autoconfiguring restemplate to use apache http client or okhttp
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(OnRibbonRestClientCondition.class)
@interface ConditionalOnRibbonRestClient {
}
private static class OnRibbonRestClientCondition extends AnyNestedCondition {
OnRibbonRestClientCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@Deprecated // remove in Edgware"
@ConditionalOnProperty("ribbon.http.client.enabled")
static class ZuulProperty {
}
@ConditionalOnProperty("ribbon.restclient.enabled")
static class RibbonProperty {
}
}
/**
* {@link AllNestedConditions} that checks that either multiple classes are present.
*/
static class RibbonClassesConditions extends AllNestedConditions {
RibbonClassesConditions() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@ConditionalOnClass(IClient.class)
static class IClientPresent {
}
@ConditionalOnClass(RestTemplate.class)
static class RestTemplatePresent {
}
@ConditionalOnClass(AsyncRestTemplate.class)
static class AsyncRestTemplatePresent {
}
@ConditionalOnClass(Ribbon.class)
static class RibbonPresent {
}
}
}
这里初始化的比较重要的组件有:
- SpringClientFactory:创建客户端、负载均衡器和客户端配置实例的工厂,它为每个客户机名创建一个Spring ApplicationContext,并从中提取所需的bean。
- LoadBalancerClient:负载均衡器客户端
- LoadBalancedRetryFactory:SpringCloud重试功能定制工厂类
关于这几个组件具体使用的时间节点,会在后面通过一次负载均衡的Http请求来讲解。
3、LoadBalancerAutoConfiguration
LoadBalancerAutoConfiguration是Ribbon负载均衡自动配置类,其代码如下。
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
// 负载均衡请求工厂类
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
// Auto configuration for retry mechanism.
// 重试机制配置类
@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryFactory() {
return new LoadBalancedRetryFactory() {
};
}
}
// 重试拦截机制配置类
// Auto configuration for retry intercepting mechanism.
@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRetryProperties properties,
LoadBalancerRequestFactory requestFactory,
LoadBalancedRetryFactory loadBalancedRetryFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
requestFactory, loadBalancedRetryFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
// 忽略
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
这里初始化的比较重要的组件有:
- LoadBalancerRequestFactory:为LoadBalancerInterceptor和RetryLoadBalancerInterceptor创建LoadBalancerRequest,并将LoadBalancerRequestTransformer应用于拦截的HttpRequest
- LoadBalancedRetryFactory:如果容器中没有LoadBalancedRetryFactory实例的话,这里会创建一个空的LoadBalancedRetryFactory实例
- RestTemplateCustomizer:定制RestTemplate,加入RetryLoadBalancerInterceptor拦截器,通过RetryLoadBalancerInterceptor拦截Http请求,并调用choose方法选择服务器,实现负载均衡
关于这几个组件具体使用的时间节点,下面我们通过一次负载均衡的Http请求来讲解。
4、负载均衡的Http请求调用过程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-w8Mrm0kB-1631330467682)(media/15604189016881/15638776502118.jpg)]
访问UserController的一个get方法,其调用时序图如上。其中关于RestTemplate的调用过程我们不多赘述,其最终会调用到RetryLoadBalancerInterceptor的intercept方法。接下来我们从这里开始分析:
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
// 获取URL和serviceName
final URI originalUri = request.getURI();
final String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
// 获取负载均衡重试策略并创建重试模板
final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
// 创建重试策略模板,并为模板设置重试策略;
// 未开启重试机制使用NeverRetryPolicy策略;
// 若开启重试机制使用InterceptorRetryPolicy策略
RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
// 执行请求
return template.execute(context -> {
ServiceInstance serviceInstance = null;
// 获取serviceInstance
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
}
// 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
if (serviceInstance == null) {
serviceInstance = this.loadBalancer.choose(serviceName);
}
// 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
.execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
int statusCode = response.getRawStatusCode();
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
}
return response;
}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
// This is a special case, where both parameters to
// LoadBalancedRecoveryCallback are
// the same. In most cases they would be different.
@Override
protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
return response;
}
});
}
这里执行的较为重要的步骤有:
- 获取负载均衡重试策略并创建重试模板
- 创建重试策略模板,并为模板设置重试策略
- 执行请求
4.1 获取负载均衡重试策略并创建重试模板
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
return new RibbonLoadBalancedRetryPolicy(service, lbContext,
serviceInstanceChooser, clientFactory.getClientConfig(service));
}
// 构造函数
public RibbonLoadBalancedRetryPolicy(String serviceId,RibbonLoadBalancerContext context,
ServiceInstanceChooser loadBalanceChooser,IClientConfig clientConfig) {
this.serviceId = serviceId;
this.lbContext = context;
this.loadBalanceChooser = loadBalanceChooser;
// 获取并设置重试的http状态码
String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
for (String code : retryableStatusCodesArray) {
if (!StringUtils.isEmpty(code)) {
try {
retryableStatusCodes.add(Integer.valueOf(code.trim()));
}
catch (NumberFormatException e) {
log.warn("We cant add the status code because the code [ " + code
+ " ] could not be converted to an integer. ", e);
}
}
}
}
4.2 创建重试策略模板,并为模板设置重试策略
private RetryTemplate createRetryTemplate(String serviceName,HttpRequest request,LoadBalancedRetryPolicy retryPolicy){
RetryTemplate template = new RetryTemplate();
// 创建并设置BackOffPolicy
BackOffPolicy backOffPolicy = this.lbRetryFactory.createBackOffPolicy(serviceName);
template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
template.setThrowLastExceptionOnExhausted(true);
// 设置RetryListener
RetryListener[] retryListeners = this.lbRetryFactory.createRetryListeners(serviceName);
if (retryListeners != null && retryListeners.length != 0) {
template.setListeners(retryListeners);
}
// 重点:
// 如果配置了spring.cloud.loadbalancer.retry=false;或者retryPolicy==null,则使用NeverRetryPolicy
// 否则使用InterceptorRetryPolicy
template.setRetryPolicy(!this.lbProperties.isEnabled() || retryPolicy == null
? new NeverRetryPolicy():new InterceptorRetryPolicy(request,retryPolicy,this.loadBalancer,serviceName));
return template;
}
该方法中只要关心NeverRetryPolicy和InterceptorRetryPolicy即可,其调用方式我们在后面继续分析。
4.3 执行请求
前面提到了NeverRetryPolicy和InterceptorRetryPolicy,那么下面我们针对两种不同的策略,来看其具体的调用过程:首先在配置文件中禁用重试(此属性默认开启)
#允许重试
spring.cloud.loadbalancer.retry.enabled=false
调用RetryTemplate的execute方法,在execute方法中调用doExecute,在doExecute方法中有:
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
// 省略部分代码
}
这里canRetry会调用当前使用的重试策略中的方法,因为我们未开启请求重试,则使用的是NeverRetryPolicy,这里要注意:
public boolean canRetry(RetryContext context) {
return !((NeverRetryContext) context).isFinished();
}
该方法会在在第一个异常之后返回false。其默认总是有一个尝试,然后防止重试。所以这里即使配置了禁用重试,这里默认也会返回true。那么如果我们开启了重试策略,则使用InterceptorRetryPolicy,我们来看一下其canRetry方法:
@Override
public boolean canRetry(RetryContext context) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
// We haven't even tried to make the request yet so return true so we do
// 为LoadBalancedRetryContext设置ServiceInstance
lbContext.setServiceInstance(this.serviceInstanceChooser.choose(this.serviceName));
return true;
}
return this.policy.canRetryNextServer(lbContext);
}
有一点需要注意,这里已经为LoadBalancedRetryContext设置了ServiceInstance,关于其choose方法,我们会在后面讲解。
回到RetryLoadBalancerInterceptor的execute方法,我们看一下开启重试、关闭重试这两种策略是如何获取调用服务器的。
// 获取serviceInstance,如果为禁用重试策略,则这里是true
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
}
// 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
if (serviceInstance == null) {
serviceInstance = this.loadBalancer.choose(serviceName);
}
对于开启重试策略,因为已经在canRetry中设置了serviceInstance,所以直接获取即可;对于关闭重试,则需要通过choose方法来获取serviceInstance,关于choose过程,会在后面的负载均衡策略方式中分析。那有了这些准备工作,接下来就可以正式对serviceInstance发起请求了。其具体的调用依然是使用ClientHttpRequest来进行,不多赘述,感兴趣的同学可以自己查看其代码。
5、Ribbon负载均衡策略
前文分析到了一个choose方法,该方法即负载均衡的入口。
// RibbonLoadBalancerClient类的choose、getServer方法
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
public ServiceInstance choose(String serviceId, Object hint) {
// 获取被调用的服务器
Server server = getServer(getLoadBalancer(serviceId), hint);
// 服务器为空返回null;否则返回RibbonServer
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
// BaseLoadBalancer类的chooseServer方法
// 根据指定的key获取一个可用的服务器
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
// 无可用负载均衡策略,返回null
if (rule == null) {
return null;
} else {
try {
// 根据负载均衡策略选择服务器
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
从上面的方法调用过程中可以看到,rule.choose(key)是最终选择服务器的方法。rule是IRule接口的实现类。IRule接口为LoadBalancer定义“规则”的接口。规则可以看作是负载平衡的策略。众所周知的负载平衡策略包括轮询、基于响应时间的策略等。最常见的负载均衡策略有轮询、随机、权重等,接下来分析一下这三种最常见的策略。
5.1 ZoneAvoidanceRule 过滤轮询策略
该类继承了PredicateBasedRule,其属于轮询策略,也是SpringCloud的默认负载均衡策略,该方法会先筛选出所有可用的服务器列表,然后使用轮询算法得到被调用的服务器。
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// 获取被调用的服务器
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
从代码中看,该方法的核心是chooseRoundRobinAfterFiltering,其代码如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
// 获取所有可用的服务器列表
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
// 使用轮询策略从可用服务器列表中选出被调用服务器
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
获取被调用服务器的过程分为三步:
- 获取所有的服务器列表,包括可用和不可用
- 过滤并得到可用的候选服务器列表,即从中筛选出可用的服务器列表
- 使用轮询策略从候选服务器列表中选择一个被调用的服务器
5.1.1 获取候选服务器列表
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
该方法的核心是apply方法,最终会调用ZoneAvoidancePredicate类下的apply方法:
public boolean apply(@Nullable PredicateKey input) {
// 如niws.loadbalancer.zoneAvoidanceRule.enabled=false,
// 则不开启server zone,无需过滤
if (!ENABLED.get()) {
return true;
}
// 如server zone名称为null,无需过滤
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter out this server
return true;
}
// 如无可用的LoadBalancerStats,无需过滤
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
// 如可用的server zone数量<=1,无需过滤
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
// 如负载均衡中未包含当前获取到的分片zone,无需过滤
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
// 获取可用的server zone
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot,
triggeringLoad.get(),
triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
// 再次判断获取到的可用server zone中是否包含要过滤的server zone
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
5.1.2 使用轮询策略选出被调用服务器
private final AtomicInteger nextIndex = new AtomicInteger();
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
轮询的算法还是很简单的,不多赘述。
5.2 开启自定义负载均衡策略
如果想要开启自定义负载均衡策略,可以有两种方式:
1、使用Ribbon中已存在的负载均衡策略,如轮询
/**
* RoundRobinRule 轮询负载均衡策略
*/
@Configuration
public class RoundRobinRuleConfig {
@Bean
public IRule roundRobinRule() {
return new RoundRobinRule();
}
}
2、实现IRule接口并将其作为配置类
/**
* 自定义负载均衡策略
*/
@Configuration
public class MyRule implements IRule {
@Override
public Server choose(Object key) {
// do something
return null;
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
// do something
}
@Override
public ILoadBalancer getLoadBalancer() {
// do something
return null;
}
@Bean
public IRule myRule() {
return new MyRule();
}
}
5.3 RoundRobinRule 轮询策略
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
// 可用服务器和所有服务器信息
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
// 无可用服务器
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 获取下一次请求调用的服务器
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
// 判断获取到的服务器状态,若不可用,继续循环;如可用,则返回
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
// 若尝试获取服务器次数大于等于10,则直接返回;并记录警告日志
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}
return server;
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
5.4 RandomRule 随机策略
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
// 可用服务
List<Server> upList = lb.getReachableServers();
// 所有服务
List<Server> allList = lb.getAllServers();
// 所有服务数
int serverCount = allList.size();
// 无服务
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes only get more restrictive.
*/
return null;
}
// 随机获取一个server
int index = chooseRandomInt(serverCount);
server = upList.get(index);
// 判断server可用性,返回server或继续重试
if (server == null) {
/*
* The only time this should happen is if the server list were somehow trimmed.
* This is a transient condition. Retry after yielding.
*/
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
// 获取一个[0~serverCount)随机数,使用ThreadLocalRandom减少竞争,提高并发效率
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
5.4 WeightedResponseTimeRule 权重策略
使用平均响应时间为每个服务器分配动态“权重”的规则,然后以“加权循环”方式使用该规则。相对于轮询、随机,该策略稍微复杂一些。
5.4.1 维护权重
该方法首先会开启定时任务来维护权重,即服务器的平均响应时间。
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
serverWeightTimer.cancel();
}
}));
}
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
class ServerWeight {
// 权重维护
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
// 开始调整权重
logger.info("Weight adjusting job started ...");
// 获取负载均衡数据信息统计
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
// 计算所有服务器平均响应时间之和
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
// 每个服务器的权重为(所有服务器的响应时间之和 - 平均响应时间)
// 因此,响应时间越长,权重越小,被选择的可能性就越小
// 服务器权重之和
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
// 单独服务器权重 = 所有服务器的响应时间之和 - 平均响应时间
double weight = totalResponseTime - ss.getResponseTimeAvg();
// 累加服务器权重之和
weightSoFar += weight;
// 添加累加服务器权重之和到累计权重列表之中
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
5.4.2 选择服务器
然后通过choose方法根据权重来挑选服务器,即平均响应时间越短越有可能被选中
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
// 获取当前引用,以防它被其他线程更改
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
// 所有服务器信息
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
// 最大权重;若权重累积集合为空,则取0;否则取权重累积集合中最后一个元素
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized,fallback to use round robin
// 若若权重累积尚未被初始化,或者统计的服务数量与总服务器数量不等,则使用轮询负载均衡策略
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
// 随机生成一个在0(包含)到 最大权重(不包含)之间的随机数
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
// 根据生成的随机数在累计权重列表中获取下标,并根据下标获取对应的服务器
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
// 判断server可用性,返回server或继续重试
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
6、Ribbon 重试机制
前面已分析了Ribbon通过RestTemplate开启负载均衡,并分析了完整的一次Http调用过程,那如果在调用过程中发生错误,我们想重试调用的话,可以使用Ribbon的重试机制。前文分析到在RetryLoadBalancerInterceptor类的intercept方法中有如下代码:
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
// 获取URL和serviceName
final URI originalUri = request.getURI();
final String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
// 获取负载均衡重试策略并创建重试模板
final LoadBalancedRetryPolicy retryPolicy = this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer);
// 创建重试策略模板,并为模板设置重试策略;
// 未开启重试机制使用NeverRetryPolicy策略;
// 若开启重试机制使用InterceptorRetryPolicy策略
RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
// 执行请求
return template.execute(context -> {
ServiceInstance serviceInstance = null;
// 获取serviceInstance,如果为禁用重试策略,则这里是true
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
}
// 若未能获取到serviceInstance,则通过loadBalancer选择一个服务器
if (serviceInstance == null) {
serviceInstance = this.loadBalancer.choose(serviceName);
}
// 使用指定服务的LoadBalancer(负载均衡器)中的ServiceInstance(服务实例)执行请求。
ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer
.execute(serviceName, serviceInstance,this.requestFactory.createRequest(request, body, execution));
int statusCode = response.getRawStatusCode();
// 重试策略不为null 且 重试策略指定的重试http状态码包含当前请求返回的状态码
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response,bodyCopy);
}
return response;
}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
// This is a special case, where both parameters to LoadBalancedRecoveryCallback are the same.
// In most cases they would be different.
@Override
protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
return response;
}
});
}
如果重试策略不为null,并且重试策略指定的重试http状态码包含当前请求返回的状态码,那么Ribbon会重试该Http请求。其中获取重试策略通过this.lbRetryFactory.createRetryPolicy(serviceName,this.loadBalancer)来完成,我们来看其具体的获取过程。
/**
* 创建负载均衡的重试策略模板
*/
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String service,ServiceInstanceChooser serviceInstanceChooser) {
RibbonLoadBalancerContext lbContext = this.clientFactory.getLoadBalancerContext(service);
return new RibbonLoadBalancedRetryPolicy(service, lbContext,
serviceInstanceChooser, clientFactory.getClientConfig(service));
}
public RibbonLoadBalancedRetryPolicy(String serviceId,
RibbonLoadBalancerContext context,
ServiceInstanceChooser loadBalanceChooser,
IClientConfig clientConfig) {
this.serviceId = serviceId;
this.lbContext = context;
this.loadBalanceChooser = loadBalanceChooser;
// 获取并设置重试的http状态码
// 通过clientName.ribbon.retryableStatusCodes设置
String retryableStatusCodesProp = clientConfig.getPropertyAsString(RETRYABLE_STATUS_CODES, "");
String[] retryableStatusCodesArray = retryableStatusCodesProp.split(",");
for (String code : retryableStatusCodesArray) {
if (!StringUtils.isEmpty(code)) {
try {
retryableStatusCodes.add(Integer.valueOf(code.trim()));
} catch (NumberFormatException e) {
log.warn("We cant add the status code because the code [ " + code
+ " ] could not be converted to an integer. ", e);
}
}
}
}
RibbonLoadBalancedRetryPolicy策略实例化的过程很简单,除了将必要的变量信息赋值以外,还解析了重试http状态码。http状态码通过clientName.ribbon.retryableStatusCodes指定,而且从源码中我们已经可以看到多个状态码之间用英文逗号分隔,而且必须是整形数字。那么到这里重试策略已经获取完成了。接下来再看看Ribbon的常用重试配置,其具体的作用见注释:
# 允许重试,该属性默认为true
spring.cloud.loadbalancer.retry.enabled=true
# 同一个Server最大的重试次数,该属性值默认为0
spring-cloud-provider.ribbon.MaxAutoRetries=5
# 最大重试Server的个数,该属性默认值为1
spring-cloud-provider.ribbon.MaxAutoRetriesNextServer=5
# 是否开启任何异常都重试(默认在get请求下会重试,其他情况不会重试,除非设置为true)
spring-cloud-provider.ribbon.OkToRetryOnAllOperations=false
# 指定重试的http状态码
spring-cloud-provider.ribbon.retryableStatusCodes=404,401,500
6.1、重试策略触发及流程
RetryLoadBalancerInterceptor的intercept方法,最终会通过RetryTemplate进行调用,其调用方法如下:
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
// Allow the retry policy to initialise itself...
// 允许重试策略自我初始化
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// Make sure the context is available globally for clients who need it...
// 注册重试上下文
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// Give clients a chance to enhance the context...
// 给客户一个增强上下文的机会
// 若RetryListener的open方法返回false,整个重试将被终止,并抛出异常
boolean running = doOpenInterceptors(retryCallback, context);
if (!running) {
// 在第一次尝试前拦截器异常终止重试
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
// Get or Start the backoff context...
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
/*
* We allow the whole loop to be skipped if the policy or context already forbid the first try.
* This is used in the case of external retry to allow a
* recovery in handleRetryExhausted without the callback processing (which
* would throw an exception).
*
* canRetry --> 决定是否继续进行正在进行的重试尝试。
* 此方法在执行RetryCallback之前调用,但在执行backoff和open拦截器之后调用。
*
* context.isExhaustedOnly() --> RetryContext已经设置了不再尝试或重试当前的RetryCallback
*
*/
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
this.logger.info("...Retry: count=" + context.getRetryCount());
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
// Reset the last exception,
// so if we are successful the close interceptors will not think we failed...
// 重置(清空)最后的一次异常
lastException = null;
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
lastException = e;
try {
// 注册异常信息,并记录当前重试次数
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable",ex);
}
finally {
doOnErrorInterceptors(retryCallback, context, e);
}
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
}
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count="+ context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
/*
* A stateful attempt that can retry may rethrow the exception before now,
* but if we get this far in a stateful retry there's a reason for it,
* like a circuit breaker or a rollback classifier.
*
* 可以重试的有状态尝试可能会在现在之前重新抛出异常,
* 但是如果我们在有状态重试中走到这一步,那么这是有原因的,
* 比如断路器或回滚分类器。
*/
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
该段代码较长,其中重点有两部分:
第一:while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) 判断是否可继续重试,这里包含了两个条件。
第一个条件,canRetry 是否可重试,与具体的重试策略有关,我们来看下RibbonLoadBalancedRetryPolicy策略是如何判断可重试的:
public boolean canRetry(LoadBalancedRetryContext context) {
HttpMethod method = context.getRequest().getMethod();
return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
}
如果当前请求类型是GET或者设置了spring-cloud-provider.ribbon.OkToRetryOnAllOperations=true,那么该请求满足canRetry条件。
第二个条件,RetryContext上下文已经设置了不再尝试或重试当前的RetryCallback,例如重试次数已经达到我们设置的重试次数之后,Ribbon会通过setExhaustedOnly设置该属性为true,不再进行请求重试。
第二:registerThrowable(retryPolicy, state, context, e) 注册异常信息,在RetryLoadBalancerInterceptor类的intercept方法中已经介绍过,当被调用server返回的状态码中包含我们通过spring-cloud-provider.ribbon.retryableStatusCodes配置的状态码时会抛出ClientHttpResponseStatusCodeException,该异常会被捕获到并通过registerThrowable方法注册异常信息。
@Override
public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
// if this is a circuit tripping exception then notify the load balancer
// 如果该异常是断路异常,则通知负载均衡器
if (lbContext.getRetryHandler().isCircuitTrippingException(throwable)) {
updateServerInstanceStats(context);
}
// Check if we need to ask the load balancer for a new server.
// Do this before we increment the counters because the first call to this method
// is not a retry it is just an initial failure.
// 检查是否需要向负载平衡器请求新服务器。
// 在增加计数器之前执行此操作,因为对该方法的第一次调用不是重试,而是初始失败。
if (!canRetrySameServer(context) && canRetryNextServer(context)) {
context.setServiceInstance(loadBalanceChooser.choose(serviceId));
}
// This method is called regardless of whether we are retrying or making the first request.
// Since we do not count the initial request in the retry count we don't reset the counter
// until we actually equal the same server count limit. This will allow us to make the initial
// request plus the right number of retries.
// 无论我们是重试还是发出第一个请求,都会调用此方法。
// 由于在重试计数中不计算初始请求,所以在实际等于相同的服务器计数限制之前不会重置计数器。
// 这将允许我们发出初始请求和正确的重试次数。
/**
* 实例变量 sameServerCount 和 nextServerCount 默认都是0
*
* 这里假如我们开启了重试机制,那么MaxAutoRetries和MaxAutoRetriesNextServer(默认值为1)配置将决定重试次数
*
* 若重试包含首次调用的话,那么重试次数 =(MaxAutoRetries+1)*(MaxAutoRetriesNextServer+1)
*/
if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) {
// reset same server since we are moving to a new server
sameServerCount = 0;
nextServerCount++;
// 若不满足在其他服务器实例重试的条件,则将终止重试标识设置为true
if (!canRetryNextServer(context)) {
context.setExhaustedOnly();
}
} else {
sameServerCount++;
}
}
该段代码最重要的就是最后的ifelse判断,在分析这段代码之前,先来看一下canRetryNextServer方法:
@Override
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context);
}
canRetryNextServer判断条件很简单,满足canRetry条件,前文已有介绍,nextServerCount计数器小于等于spring-cloud-provider.ribbon.MaxAutoRetriesNextServer。了解了canRetryNextServer之后,再来看ifelse判断,