Ribbon原理分析

一、引言

  Ribbon是实现客户端负载均衡的组件,用于spring cloud微服务中,服务间调用负载均衡。默认是轮询算法,可以配置其他算法,还可以自定义负载均衡算法。

客户端负载均衡:一个请求在客户端的时候已经声明了要调用哪个服务,然后通过具体的负载均衡算法去调用多个节点服务中的一个。

服务端负载均衡:一个请求先经过代理服务器,例如:nginx,然后由代理服务器通过负载均衡算法反向代理服务端,完成服务的调用。

二、自动装配

想要使用ribbon的负载均衡,一般直接通过注解的方式@LoadBalanced,然后使用restTemplate调用的时候,就会自动触发负载均衡算法。当然如果我们使用了feign之类的组件已经自动集成了ribbon,不需要我们这样做了。

@LoadBalanced
@Bean
public RestTemplate restTemplate(){
    return new RestTemplate();
}

@LoadBalanced注解

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

一个空注解,单看没有什么实际意义。

Spring的SPI机制,配置文件spring.factories,可以找到自动装配类LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
// 当前环境需要有RestTemplate.class
@ConditionalOnClass(RestTemplate.class)
// 需要当前环境有LoadBalancerClient接口的实现类
@ConditionalOnBean(LoadBalancerClient.class)
// 初始化配置文件LoadBalancerRetryProperties
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    // 1、@AutoWired也会自动装载集合类list,会将合适的RestTemplate添加到restTemplates中
    // 而至于加载哪些RestTemplate,就是标注了@LoadBalanced的RestTemplate
    // 上面我们看到@LoadBalanced有一个@Qualifier就是特殊标注的含义,所以普通的没有添加@LoadBalanced
    // 则不会被添加到restTemplates中的
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

    @Bean
    // 2、SmartInitializingSingleton接口的实现类会在项目初始化之后被调用其afterSingletonsInstantiated方法
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }

    @Bean
    //相同类型的bean只能注册一个
    @ConditionalOnMissingBean
    // 3、LoadBalancerRequestFactory被创建
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
            LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        // 4、将LoadBalancerClient接口的实现类和方法3中创建的LoadBalancerRequestFactory
        // 注入到该方法中,同时成为LoadBalancerInterceptor拦截器的参数
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
         // 5、方法4中创建的LoadBalancerInterceptor会被作为方法参数注入进来
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            // customize方法会被方法2中的afterSingletonsInstantiated()遍历调用
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }

    /**
     * 配置重试机制
     */
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
            return new LoadBalancedRetryFactory() {
            };
        }

    }

    /**
     * Auto configuration for retry intercepting mechanism.
     */
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryInterceptorAutoConfiguration {

        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRetryProperties properties,
                LoadBalancerRequestFactory requestFactory,
                LoadBalancedRetryFactory loadBalancedRetryFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
                    requestFactory, loadBalancedRetryFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }

}

三、请求处理

  在项目启动后,创建RestTemplate时会添加一个拦截器,当使用restTemplate发送请求时,就会经过拦截器触发负载均衡策略,以getForObject方法调用为例。

public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
    RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    HttpMessageConverterExtractor<T> responseExtractor =
        new HttpMessageConverterExtractor<T>(responseType, getMessageConverters(), logger);
    return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
}
 
// execute
public <T> T execute(String url, HttpMethod method, RequestCallback requestCallback,
                     ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
 
    URI expanded = getUriTemplateHandler().expand(url, uriVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}
 
//doExecute
protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,
                          ResponseExtractor<T> responseExtractor) throws RestClientException {
 
    ...
    ClientHttpResponse response = null;
    try {
        // 1.创一个request请求
        ClientHttpRequest request = createRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        // 2.执行该请求
        response = request.execute();
        // 3.对响应结果进行封装
        handleResponse(url, method, response);
        if (responseExtractor != null) {
            return responseExtractor.extractData(response);
        }
        else {
            return null;
        }
    }
    ...
}

1、createRequest(url, method)创建一个request请求

protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
    // 重点在这里
    ClientHttpRequest request = getRequestFactory().createRequest(url, method);
    if (logger.isDebugEnabled()) {
        logger.debug("Created " + method.name() + " request for \"" + url + "\"");
    }
    return request;
}
 
//getRequestFactory
public ClientHttpRequestFactory getRequestFactory() {
    ClientHttpRequestFactory delegate = super.getRequestFactory();
    // RestTemplate拦截器不为空
    if (!CollectionUtils.isEmpty(getInterceptors())) {
        return new InterceptingClientHttpRequestFactory(delegate, getInterceptors());
    }
    else {
        return delegate;
    }
}
 
// InterceptingClientHttpRequestFactory.createRequest(url, method)
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
    // 最终返回的就是这个request
    return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}

2、执行该请求request.execute()

//当前的request为InterceptingClientHttpRequest
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    // 1.iterator即拦截器集合
    // private final Iterator<ClientHttpRequestInterceptor> iterator;
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        // 2.逐个拦截器来执行,我们就看最重要的LoadBalancerInterceptor
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
        for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
            List<String> values = entry.getValue();
            for (String value : values) {
                delegate.getHeaders().add(entry.getKey(), value);
            }
        }
        if (body.length > 0) {
            StreamUtils.copy(body, delegate.getBody());
        }
        return delegate.execute();
    }
}
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                    final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    // 真正执行的方法
    // private LoadBalancerClient loadBalancer; 
    // LoadBalancerClient默认实现类为RibbonLoadBalancerClient
    return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
 
// RibbonLoadBalancerClient.execute()
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    // 1.根据用户请求的serviceId来获取具体的LoadBalanced
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 
    // 2.获取具体的server(也就是定位到哪台服务器的哪个端口号的具体服务信息)
    Server server = getServer(loadBalancer);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,serviceId),
                        serverIntrospector(serviceId).getMetadata(server));
    // 3.执行HTTP请求
    return execute(serviceId, ribbonServer, request);
}

通过这个方法的分析可以看到,里面通过一系列的算法根据用户输入的serviceId(也就是服务名)来获取到具体的服务所在host、port,然后重新封装HTTP请求,最后执行该HTTP请求即可。

2.1、getLoadBalancer()获取负载均衡器

protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.clientFactory.getLoadBalancer(serviceId);
}
 
// SpringClientFactory.getLoadBalancer()
public ILoadBalancer getLoadBalancer(String name) {
    return getInstance(name, ILoadBalancer.class);
}
 
// SpringClientFactory.getInstance()
public <C> C getInstance(String name, Class<C> type) {
    C instance = super.getInstance(name, type);
    if (instance != null) {
        return instance;
    }
    IClientConfig config = getInstance(name, IClientConfig.class);
    return instantiateWithConfig(getContext(name), type, config);
}
 
// NamedContextFactory.getInstance()
public <T> T getInstance(String name, Class<T> type) {
    AnnotationConfigApplicationContext context = getContext(name);
    if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                                                            type).length > 0) {
        // 主要就是这句,从容器中获取ILoadBalancer的实现,目前默认实现为
        // ZoneAwareLoadBalancer
        return context.getBean(type);
    }
    return null;
}

2.2、getServer(loadBalancer)

protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        // 具体执行为ZoneAwareLoadBalancer.chooseServer()
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }
 
    // ZoneAwareLoadBalancer.chooseServer()
    public Server chooseServer(Object key) {
        // 1.由于笔者测试的server,可用的zone为1个,所以会直接走super.chooseServer()
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        //如果是多region,则会走下面的方法,暂时注释掉
        ...
    }
        
    //BaseLoadBalancer.chooseServer()
   public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // rule为ZoneAvoidanceRule
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
        
    //PredicateBasedRule.choose(key)
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // 重点在这里,从所有的server中根据对应的rule来获取一个具体的server
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

getServer()的主要功能就是根据具体的rule来选择特定的Server,重要的实现实际都在这个方法里

2.3、RibbonLoadBalancerClient.execute(serviceId, ribbonServer, request)执行HTTP请求

public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
    ...
    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
 
    try {
        // 核心方法在这里,是一个回调方法,
        // 具体就是回调LoadBalancerRequestFactory.createRequest()中的apply()方法
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);
        return returnVal;
    }
    ...
}
 
    // 回调方法
    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
                                                                 final byte[] body, final ClientHttpRequestExecution execution) {
        return new LoadBalancerRequest<ClientHttpResponse>() {
 
            @Override
            // 回调方法在这里
            public ClientHttpResponse apply(final ServiceInstance instance)
            throws Exception {
                HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
                if (transformers != null) {
                    for (LoadBalancerRequestTransformer transformer : transformers) {
                        serviceRequest = transformer.transformRequest(serviceRequest, instance);
                    }
                }
                // 真正要执行的方法
                return execution.execute(serviceRequest, body);
            }
 
        };
    }
 
    //InterceptingRequestExecution.execute()
    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            return nextInterceptor.intercept(request, body, this);
        }
        // 注意:此时已经没有iterator,直接执行request请求
        else {
            // 1.根据URI获得请求,并封装头部
            ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
            for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
                List<String> values = entry.getValue();
                for (String value : values) {
                    delegate.getHeaders().add(entry.getKey(), value);
                }
            }
            if (body.length > 0) {
                StreamUtils.copy(body, delegate.getBody());
            }
            // 2.本质就是对HttpURLConnection的执行
            return delegate.execute();
        }
    }
}

3、封装响应

    protected void handleResponse(URI url, HttpMethod method, ClientHttpResponse response) throws IOException {
        //获取处理错误的处理器
        ResponseErrorHandler errorHandler = getErrorHandler();
        boolean hasError = errorHandler.hasError(response);
        if (logger.isDebugEnabled()) {
            try {
                int code = response.getRawStatusCode();
                //解析响应状态
                HttpStatus status = HttpStatus.resolve(code);
                logger.debug("Response " + (status != null ? status : code));
            }
            catch (IOException ex) {
            }
        }
        if (hasError) {
            //处理错误的相应
            errorHandler.handleError(url, method, response);
        }
    }

总结:

1、创建RestTemplate
2、添加了ribbon依赖后,会在项目启动的时候自动往RestTemplate中添加LoadBalancerInterceptor拦截器
3、用户根据RestTemplate发起请求时,会将请求转发到LoadBalancerInterceptor去执行,该拦截器会根据指定的负载均衡方式获取该次请求对应的应用服务端IP、port
4、根据获取到的IP、port重新封装请求,发送HTTP请求,返回具体响应

四、自定义负载均衡器

  实现接口IRule,然后可以看一下ribbon自己实现的一些默认规则,比如:RandomRule,参照他的写法去实现自己想要的负载均衡策略,最后注入到Spring容器即可。

参考:https://blog.csdn.net/qq_26323323/article/details/81327669

上一篇:从Cloud-Hosted到Cloud-Native,AnalyticDB PostgreSQL数据仓库的云原生实践


下一篇:基于SpringBoot整合SpringCloud微服务框架--Eureka注册中心及Feign远程调用/Ribbon负载均衡及Hystrix熔断器及zuul网关